1. Add more warnings for the gst core only. Various trival fixes to quiet the warnings.
[platform/upstream/gstreamer.git] / gst / gstscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 //#define GST_DEBUG_ENABLED
24 #include "gst_private.h"
25
26 #include "gstscheduler.h"
27
28
29 static int
30 gst_schedule_loopfunc_wrapper (int argc,char *argv[])
31 {
32   GstElement *element = GST_ELEMENT (argv);
33   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
34
35   GST_DEBUG_ENTER("(%d,'%s')",argc,name);
36
37   do {
38     GST_DEBUG (GST_CAT_DATAFLOW,"calling loopfunc %s for element %s\n",
39                GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
40     (element->loopfunc) (element);
41     GST_DEBUG (GST_CAT_DATAFLOW,"element %s ended loop function\n", name);
42   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
43   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
44
45   GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
46   return 0;
47 }
48
49 static int
50 gst_schedule_chain_wrapper (int argc,char *argv[])
51 {
52   GstElement *element = GST_ELEMENT (argv);
53   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
54
55   GST_DEBUG_ENTER("(\"%s\")",name);
56
57   GST_DEBUG (GST_CAT_DATAFLOW,"stepping through pads\n");
58
59   do {
60     GList *pads = element->pads;
61     
62     while (pads) {
63       GstPad *pad = GST_PAD (pads->data);
64       GstRealPad *realpad;
65
66       pads = g_list_next (pads);
67       if (!GST_IS_REAL_PAD(pad)) 
68         continue;
69       realpad = GST_REAL_PAD(pad);
70       if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
71         GstBuffer *buf;
72
73         GST_DEBUG (GST_CAT_DATAFLOW,"pulling a buffer from %s:%s\n", name, GST_PAD_NAME (pad));
74         buf = gst_pad_pull (pad);
75         GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s\n", name, GST_PAD_NAME (pad));
76         if (buf) 
77           GST_RPAD_CHAINFUNC(realpad) (pad,buf);
78         GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s done\n", name, GST_PAD_NAME (pad));
79       }
80     }
81   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
82   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
83
84   GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
85   return 0;
86 }
87
88 static int
89 gst_schedule_src_wrapper (int argc,char *argv[])
90 {
91   GstElement *element = GST_ELEMENT (argv);
92   GList *pads;
93   GstRealPad *realpad;
94   GstBuffer *buf = NULL;
95   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
96
97   GST_DEBUG_ENTER("(%d,\"%s\")",argc,name);
98
99   do {
100     pads = element->pads;
101     while (pads) {
102       if (!GST_IS_REAL_PAD(pads->data)) continue;
103       realpad = (GstRealPad*)(pads->data);
104       pads = g_list_next(pads);
105       if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SRC) {
106         GST_DEBUG (GST_CAT_DATAFLOW,"calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
107         if (realpad->regiontype != GST_REGION_VOID) {
108           g_return_val_if_fail (GST_RPAD_GETREGIONFUNC(realpad) != NULL, 0);
109 //          if (GST_RPAD_GETREGIONFUNC(realpad) == NULL)
110 //            fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
111 //          else
112           buf = (GST_RPAD_GETREGIONFUNC(realpad))((GstPad*)realpad,realpad->regiontype,realpad->offset,realpad->len);
113           realpad->regiontype = GST_REGION_VOID;
114         } else {
115           g_return_val_if_fail (GST_RPAD_GETFUNC(realpad) != NULL, 0);
116 //          if (GST_RPAD_GETFUNC(realpad) == NULL)
117 //            fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
118 //          else
119           buf = GST_RPAD_GETFUNC(realpad) ((GstPad*)realpad);
120         }
121
122         GST_DEBUG (GST_CAT_DATAFLOW,"calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
123         gst_pad_push ((GstPad*)realpad, buf);
124       }
125     }
126   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
127   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
128
129   GST_DEBUG_LEAVE("");
130   return 0;
131 }
132
133 static void
134 gst_schedule_chainhandler_proxy (GstPad *pad, GstBuffer *buf)
135 {
136   GstRealPad *peer = GST_RPAD_PEER(pad);
137
138   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
139   GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer \"%s:%s\"'s pen\n",buf,GST_DEBUG_PAD_NAME(peer));
140
141   // FIXME this should be bounded
142   // loop until the bufferpen is empty so we can fill it up again
143   while (GST_RPAD_BUFPEN(pad) != NULL) {
144     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
145                GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
146     cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
147
148     // we may no longer be the same pad, check.
149     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
150       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
151       pad = (GstPad *)GST_RPAD_PEER(peer);
152     }
153   }
154
155   g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
156   // now fill the bufferpen and switch so it can be consumed
157   GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
158   GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
159   cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
160
161   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
162 }
163
164 static void
165 gst_schedule_select_proxy (GstPad *pad, GstBuffer *buf)
166 {
167   GstRealPad *peer = GST_RPAD_PEER(pad);
168
169   g_print ("select proxy (%s:%s)\n",GST_DEBUG_PAD_NAME(pad));
170
171   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
172
173   GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
174
175   g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
176   // now fill the bufferpen and switch so it can be consumed
177   GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
178   GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
179   g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
180   GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
181   GST_FLAG_UNSET(GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
182   cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
183
184   g_print ("done switching\n");
185   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
186 }
187
188
189 static GstBuffer*
190 gst_schedule_gethandler_proxy (GstPad *pad)
191 {
192   GstBuffer *buf;
193   GstRealPad *peer = GST_RPAD_PEER(pad);
194
195   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
196
197   // FIXME this should be bounded
198   // we will loop switching to the peer until it's filled up the bufferpen
199   while (GST_RPAD_BUFPEN(pad) == NULL) {
200     GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
201                GST_ELEMENT_NAME(GST_ELEMENT(GST_PAD_PARENT(pad))),
202                GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
203     cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
204
205     // we may no longer be the same pad, check.
206     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
207       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
208       pad = (GstPad *)GST_RPAD_PEER(peer);
209     }
210   }
211   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
212
213   // now grab the buffer from the pen, clear the pen, and return the buffer
214   buf = GST_RPAD_BUFPEN(pad);
215   GST_RPAD_BUFPEN(pad) = NULL;
216   return buf;
217 }
218
219 static GstBuffer*
220 gst_schedule_pullregionfunc_proxy (GstPad *pad,GstRegionType type,guint64 offset,guint64 len)
221 {
222   GstBuffer *buf;
223   GstRealPad *peer = GST_RPAD_PEER(pad);
224
225   GST_DEBUG_ENTER("%s:%s,%d,%lld,%lld",GST_DEBUG_PAD_NAME(pad),type,offset,len);
226
227   // put the region info into the pad
228   GST_RPAD_REGIONTYPE(pad) = type;
229   GST_RPAD_OFFSET(pad) = offset;
230   GST_RPAD_LEN(pad) = len;
231
232   // FIXME this should be bounded
233   // we will loop switching to the peer until it's filled up the bufferpen
234   while (GST_RPAD_BUFPEN(pad) == NULL) {
235     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
236                GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
237     cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
238
239     // we may no longer be the same pad, check.
240     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
241       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
242       pad = (GstPad *)GST_RPAD_PEER(peer);
243     }
244   }
245   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
246
247   // now grab the buffer from the pen, clear the pen, and return the buffer
248   buf = GST_RPAD_BUFPEN(pad);
249   GST_RPAD_BUFPEN(pad) = NULL;
250   return buf;
251 }
252
253
254 static void
255 gst_schedule_cothreaded_chain (GstBin *bin, GstScheduleChain *chain) {
256   GList *elements;
257   GstElement *element;
258   cothread_func wrapper_function;
259   GList *pads;
260   GstPad *pad;
261
262   GST_DEBUG (GST_CAT_SCHEDULING,"chain is using COTHREADS\n");
263
264   // first create thread context
265   if (bin->threadcontext == NULL) {
266     GST_DEBUG (GST_CAT_SCHEDULING,"initializing cothread context\n");
267     bin->threadcontext = cothread_init ();
268   }
269
270   // walk through all the chain's elements
271   elements = chain->elements;
272   while (elements) {
273     element = GST_ELEMENT (elements->data);
274     elements = g_list_next (elements);
275
276     // start out without a wrapper function, we select it later
277     wrapper_function = NULL;
278
279     // if the element has a loopfunc...
280     if (element->loopfunc != NULL) {
281       wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_loopfunc_wrapper);
282       GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a loop-based\n",GST_ELEMENT_NAME(element));
283     } else {
284       // otherwise we need to decide what kind of cothread
285       // if it's not DECOUPLED, we decide based on whether it's a source or not
286       if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
287         // if it doesn't have any sinks, it must be a source (duh)
288         if (element->numsinkpads == 0) {
289           wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_src_wrapper);
290           GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a source, using _src_wrapper\n",GST_ELEMENT_NAME(element));
291         } else {
292           wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_chain_wrapper);
293           GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a filter, using _chain_wrapper\n",GST_ELEMENT_NAME(element));
294         }
295       }
296     }
297
298     // now we have to walk through the pads to set up their state
299     pads = gst_element_get_pad_list (element);
300     while (pads) {
301       pad = GST_PAD (pads->data);
302       pads = g_list_next (pads);
303       if (!GST_IS_REAL_PAD(pad)) continue;
304
305       // if the element is DECOUPLED or outside the manager, we have to chain
306       if ((wrapper_function == NULL) ||
307           (GST_RPAD_PEER(pad) &&
308            (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
309          ) {
310         // set the chain proxies
311         if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
312           GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
313           GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
314         } else {
315           GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
316           GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
317           GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
318         }
319
320       // otherwise we really are a cothread
321       } else {
322         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
323           GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
324           GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_chainhandler_proxy);
325         } else {
326           GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
327           GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_gethandler_proxy);
328           GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullregionfunc_proxy);
329         }
330       }
331     }
332
333     // need to set up the cothread now
334     if (wrapper_function != NULL) {
335       if (element->threadstate == NULL) {
336         // FIXME handle cothread_create returning NULL
337         element->threadstate = cothread_create (bin->threadcontext);
338         GST_DEBUG (GST_CAT_SCHEDULING,"created cothread %p for '%s'\n",element->threadstate,GST_ELEMENT_NAME(element));
339       }
340       cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
341       GST_DEBUG (GST_CAT_SCHEDULING,"set wrapper function for '%s' to &%s\n",GST_ELEMENT_NAME(element),
342             GST_DEBUG_FUNCPTR_NAME(wrapper_function));
343     }
344   }
345 }
346
347 G_GNUC_UNUSED static void
348 gst_schedule_chained_chain (GstBin *bin, _GstBinChain *chain) {
349   GList *elements;
350   GstElement *element;
351   GList *pads;
352   GstPad *pad;
353
354   GST_DEBUG (GST_CAT_SCHEDULING,"chain entered\n");
355   // walk through all the elements
356   elements = chain->elements;
357   while (elements) {
358     element = GST_ELEMENT (elements->data);
359     elements = g_list_next (elements);
360
361     // walk through all the pads
362     pads = gst_element_get_pad_list (element);
363     while (pads) {
364       pad = GST_PAD (pads->data);
365       pads = g_list_next (pads);
366       if (!GST_IS_REAL_PAD(pad)) continue;
367
368       if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
369         GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
370         GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
371       } else {
372         GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
373         GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
374         GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
375       }
376     }
377   }
378 }
379
380 /* depracated!! */
381 static void
382 gst_bin_schedule_cleanup (GstBin *bin)
383 {
384   GList *chains;
385   _GstBinChain *chain;
386
387   chains = bin->chains;
388   while (chains) {
389     chain = (_GstBinChain *)(chains->data);
390     chains = g_list_next(chains);
391
392 //    g_list_free(chain->disabled);
393     g_list_free(chain->elements);
394     g_list_free(chain->entries);
395
396     g_free(chain);
397   }
398   g_list_free(bin->chains);
399
400   bin->chains = NULL;
401 }
402
403 static void
404 gst_scheduler_handle_eos (GstElement *element, _GstBinChain *chain)
405 {
406   GST_DEBUG (GST_CAT_SCHEDULING,"chain removed from scheduler, EOS from element \"%s\"\n", GST_ELEMENT_NAME (element));
407   chain->need_scheduling = FALSE;
408 }
409
410 /*
411 void gst_bin_schedule_func(GstBin *bin) {
412   GList *elements;
413   GstElement *element;
414   GSList *pending = NULL;
415   GList *pads;
416   GstPad *pad;
417   GstElement *peerparent;
418   GList *chains;
419   GstScheduleChain *chain;
420
421   GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME (GST_ELEMENT (bin)));
422
423   gst_bin_schedule_cleanup(bin);
424
425   // next we have to find all the separate scheduling chains
426   GST_DEBUG (GST_CAT_SCHEDULING,"attempting to find scheduling chains...\n");
427   // first make a copy of the managed_elements we can mess with
428   elements = g_list_copy (bin->managed_elements);
429   // we have to repeat until the list is empty to get all chains
430   while (elements) {
431     element = GST_ELEMENT (elements->data);
432
433     // if this is a DECOUPLED element
434     if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
435       // skip this element entirely
436       GST_DEBUG (GST_CAT_SCHEDULING,"skipping '%s' because it's decoupled\n",GST_ELEMENT_NAME(element));
437       elements = g_list_next (elements);
438       continue;
439     }
440
441     GST_DEBUG (GST_CAT_SCHEDULING,"starting with element '%s'\n",GST_ELEMENT_NAME(element));
442
443     // prime the pending list with the first element off the top
444     pending = g_slist_prepend (NULL, element);
445     // and remove that one from the main list
446     elements = g_list_remove (elements, element);
447
448     // create a chain structure
449     chain = g_new0 (_GstBinChain, 1);
450     chain->need_scheduling = TRUE;
451
452     // for each pending element, walk the pipeline
453     do {
454       // retrieve the top of the stack and pop it
455       element = GST_ELEMENT (pending->data);
456       pending = g_slist_remove (pending, element);
457
458       // add ourselves to the chain's list of elements
459       GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to chain\n",GST_ELEMENT_NAME(element));
460       chain->elements = g_list_prepend (chain->elements, element);
461       chain->num_elements++;
462       gtk_signal_connect (G_OBJECT (element), "eos", gst_scheduler_handle_eos, chain);
463       // set the cothreads flag as appropriate
464       if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
465         chain->need_cothreads = TRUE;
466       if (bin->use_cothreads == TRUE)
467         chain->need_cothreads = TRUE;
468
469       // if we're managed by the current bin, and we're not decoupled,
470       // go find all the peers and add them to the list of elements to check
471       if ((element->manager == GST_ELEMENT(bin)) &&
472           !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
473         // remove ourselves from the outer list of all managed elements
474 //        GST_DEBUG (GST_CAT_SCHEDULING,"removing '%s' from list of possible elements\n",GST_ELEMENT_NAME(element));
475         elements = g_list_remove (elements, element);
476
477         // if this element is a source, add it as an entry
478         if (element->numsinkpads == 0) {
479           chain->entries = g_list_prepend (chain->entries, element);
480           GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as SRC entry into the chain\n",GST_ELEMENT_NAME(element));
481         }
482
483         // now we have to walk the pads to find peers
484         pads = gst_element_get_pad_list (element);
485         while (pads) {
486           pad = GST_PAD (pads->data);
487           pads = g_list_next (pads);
488           if (!GST_IS_REAL_PAD(pad)) continue;
489           GST_DEBUG (GST_CAT_SCHEDULING,"have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
490
491           if (GST_RPAD_PEER(pad) == NULL) continue;
492           if (GST_RPAD_PEER(pad) == NULL) GST_ERROR(pad,"peer is null!");
493           g_assert(GST_RPAD_PEER(pad) != NULL);
494           g_assert(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))) != NULL);
495
496           peerparent = GST_ELEMENT(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))));
497
498           GST_DEBUG (GST_CAT_SCHEDULING,"peer pad %p\n", GST_RPAD_PEER(pad));
499           // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
500           // only add it if it's in the list of un-visited elements still
501           if ((g_list_find (elements, peerparent) != NULL) ||
502               GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED)) {
503             // add the peer element to the pending list
504             GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to list of pending elements\n",
505                        GST_ELEMENT_NAME(peerparent));
506             pending = g_slist_prepend (pending, peerparent);
507
508             // if this is a sink pad, then the element on the other side is an entry
509             if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
510                 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
511               chain->entries = g_list_prepend (chain->entries, peerparent);
512               gtk_signal_connect (G_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
513               GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as DECOUPLED entry into the chain\n",GST_ELEMENT_NAME(peerparent));
514             }
515           } else
516             GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' has already been dealt with\n",GST_ELEMENT_NAME(peerparent));
517         }
518       }
519     } while (pending);
520
521     // add the chain to the bin
522     GST_DEBUG (GST_CAT_SCHEDULING,"have chain with %d elements: ",chain->num_elements);
523     { GList *elements = chain->elements;
524       while (elements) {
525         element = GST_ELEMENT (elements->data);
526         elements = g_list_next(elements);
527         GST_DEBUG_NOPREFIX(GST_CAT_SCHEDULING,"%s, ",GST_ELEMENT_NAME(element));
528       }
529     }
530     GST_DEBUG_NOPREFIX(GST_CAT_DATAFLOW,"\n");
531     bin->chains = g_list_prepend (bin->chains, chain);
532     bin->num_chains++;
533   }
534   // free up the list in case it's full of DECOUPLED elements
535   g_list_free (elements);
536
537   GST_DEBUG (GST_CAT_SCHEDULING,"\nwe have %d chains to schedule\n",bin->num_chains);
538
539   // now we have to go through all the chains and schedule them
540   chains = bin->chains;
541   while (chains) {
542     chain = (GstScheduleChain *)(chains->data);
543     chains = g_list_next (chains);
544
545     // schedule as appropriate
546     if (chain->need_cothreads) {
547       gst_schedule_cothreaded_chain (bin,chain);
548     } else {
549       gst_schedule_chained_chain (bin,chain);
550     }
551   }
552
553   GST_DEBUG_LEAVE("(\"%s\")",GST_ELEMENT_NAME(GST_ELEMENT(bin)));
554 }
555 */
556
557
558 /*
559         // ***** check for possible connections outside
560         // get the pad's peer
561         peer = gst_pad_get_peer (pad);
562         // FIXME this should be an error condition, if not disabled
563         if (!peer) break;
564         // get the parent of the peer of the pad
565         outside = GST_ELEMENT (gst_pad_get_parent (peer));
566         // FIXME this should *really* be an error condition
567         if (!outside) break;
568         // if it's a source or connection and it's not ours...
569         if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
570             (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
571           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
572             GST_DEBUG (0,"dealing with outside source element %s\n",GST_ELEMENT_NAME(outside));
573 //            GST_DEBUG (0,"PUNT: copying gethandler ptr from %s:%s to %s:%s (@ %p)\n",
574 //GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->gethandler);
575 //            pad->gethandler = pad->peer->gethandler;
576 //            GST_DEBUG (0,"PUNT: setting chainhandler proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
577 //            pad->peer->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_fake_proxy);
578             GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
579             GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
580           }
581         } else {
582 */
583
584
585
586
587
588 /*
589       } else if (GST_IS_SRC (element)) {
590         GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
591         bin->entries = g_list_prepend (bin->entries,element);
592         bin->num_entries++;
593         cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
594       }
595
596       pads = gst_element_get_pad_list (element);
597       while (pads) {
598         pad = GST_PAD(pads->data);
599
600         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
601           GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
602           // set the proxy functions
603           pad->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_proxy);
604           GST_DEBUG (0,"chainhandler %p = gst_bin_chainhandler_proxy %p\n",&pad->chainhandler,gst_bin_chainhandler_proxy);
605         } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
606           GST_DEBUG (0,"setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
607           // set the proxy functions
608           GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
609           GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
610           GST_DEBUG (0,"pad->gethandler(@%p) = gst_bin_gethandler_proxy(@%p)\n",
611                 &pad->gethandler,gst_bin_gethandler_proxy);
612           pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
613         }
614         pads = g_list_next (pads);
615       }
616       elements = g_list_next (elements);
617
618       // if there are no entries, we have to pick one at random
619       if (bin->num_entries == 0)
620         bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
621     }
622   } else {
623     GST_DEBUG (0,"don't need cothreads, looking for entry points\n");
624     // we have to find which elements will drive an iteration
625     elements = bin->children;
626     while (elements) {
627       element = GST_ELEMENT (elements->data);
628       GST_DEBUG (0,"found element \"%s\"\n", GST_ELEMENT_NAME (element));
629       if (GST_IS_BIN (element)) {
630         gst_bin_create_plan (GST_BIN (element));
631       }
632       if (GST_IS_SRC (element)) {
633         GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
634         bin->entries = g_list_prepend (bin->entries, element);
635         bin->num_entries++;
636       }
637
638       // go through all the pads, set pointers, and check for connections
639       pads = gst_element_get_pad_list (element);
640       while (pads) {
641         pad = GST_PAD (pads->data);
642
643         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
644           GST_DEBUG (0,"found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
645
646           // copy the peer's chain function, easy enough
647           GST_DEBUG (0,"copying peer's chainfunc to %s:%s's chainhandler\n",GST_DEBUG_PAD_NAME(pad));
648           GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)));
649
650           // need to walk through and check for outside connections
651 //FIXME need to do this for all pads
652           // get the pad's peer
653           peer = GST_RPAD_PEER(pad);
654           if (!peer) {
655             GST_DEBUG (0,"found SINK pad %s has no peer\n", GST_ELEMENT_NAME (pad));
656             break;
657           }
658           // get the parent of the peer of the pad
659           outside = GST_ELEMENT (GST_PAD_PARENT(peer));
660           if (!outside) break;
661           // if it's a connection and it's not ours...
662           if (GST_IS_CONNECTION (outside) &&
663                (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
664             gst_info("gstbin: element \"%s\" is the external source Connection "
665                                     "for internal element \"%s\"\n",
666                           GST_ELEMENT_NAME (GST_ELEMENT (outside)),
667                           GST_ELEMENT_NAME (GST_ELEMENT (element)));
668             bin->entries = g_list_prepend (bin->entries, outside);
669             bin->num_entries++;
670           }
671         }
672         else {
673           GST_DEBUG (0,"found pad %s\n", GST_ELEMENT_NAME (pad));
674         }
675         pads = g_list_next (pads);
676
677       }
678       elements = g_list_next (elements);
679     }
680 */
681
682
683
684
685 /*
686   // If cothreads are needed, we need to not only find elements but
687   // set up cothread states and various proxy functions.
688   if (bin->need_cothreads) {
689     GST_DEBUG (0,"bin is using cothreads\n");
690
691     // first create thread context
692     if (bin->threadcontext == NULL) {
693       GST_DEBUG (0,"initializing cothread context\n");
694       bin->threadcontext = cothread_init ();
695     }
696
697     // walk through all the children
698     elements = bin->managed_elements;
699     while (elements) {
700       element = GST_ELEMENT (elements->data);
701       elements = g_list_next (elements);
702
703       // start out with a NULL warpper function, we'll set it if we want a cothread
704       wrapper_function = NULL;
705
706       // have to decide if we need to or can use a cothreads, and if so which wrapper
707       // first of all, if there's a loopfunc, the decision's already made
708       if (element->loopfunc != NULL) {
709         wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
710         GST_DEBUG (0,"element %s is a loopfunc, must use a cothread\n",GST_ELEMENT_NAME (element));
711       } else {
712         // otherwise we need to decide if it needs a cothread
713         // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
714         if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
715             (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
716              !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
717           // base it on whether we're going to loop through source or sink pads
718           if (element->numsinkpads == 0)
719             wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
720           else
721             wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
722         }
723       }
724
725       // walk through the all the pads for this element, setting proxy functions
726       // the selection of proxy functions depends on whether we're in a cothread or not
727       pads = gst_element_get_pad_list (element);
728       while (pads) {
729         pad = GST_PAD (pads->data);
730         pads = g_list_next (pads);
731
732         // check to see if someone else gets to set up the element
733         peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
734         if (peer_manager != GST_ELEMENT(bin)) {
735           GST_DEBUG (0,"WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
736         }
737
738         // if the wrapper_function is set, we need to use the proxy functions
739         if (wrapper_function != NULL) {
740           // set up proxy functions
741           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
742             GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
743             pad->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_proxy);
744           } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
745             GST_DEBUG (0,"setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
746             GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
747             GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
748           }
749         } else {
750           // otherwise we need to set up for 'traditional' chaining
751           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
752             // we can just copy the chain function, since it shares the prototype
753             GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",
754                   GST_DEBUG_PAD_NAME(pad));
755             pad->chainhandler = pad->chainfunc;
756           } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
757             // we can just copy the get function, since it shares the prototype
758             GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",
759                   GST_DEBUG_PAD_NAME(pad));
760             pad->gethandler = pad->getfunc;
761           }
762         }
763       }
764
765       // if a loopfunc has been specified, create and set up a cothread
766       if (wrapper_function != NULL) {
767         if (element->threadstate == NULL) {
768           element->threadstate = cothread_create (bin->threadcontext);
769           GST_DEBUG (0,"created cothread %p (@%p) for \"%s\"\n",element->threadstate,
770                 &element->threadstate,GST_ELEMENT_NAME (element));
771         }
772         cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
773         GST_DEBUG (0,"set wrapper function for \"%s\" to &%s\n",GST_ELEMENT_NAME (element),
774               GST_DEBUG_FUNCPTR_NAME(wrapper_function));
775       }
776
777 //      // HACK: if the element isn't decoupled, it's an entry
778 //      if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
779 //        bin->entries = g_list_append(bin->entries, element);
780     }
781
782   // otherwise, cothreads are not needed
783   } else {
784     GST_DEBUG (0,"bin is chained, no cothreads needed\n");
785
786     elements = bin->managed_elements;
787     while (elements) {
788       element = GST_ELEMENT (elements->data);
789       elements = g_list_next (elements);
790
791       pads = gst_element_get_pad_list (element);
792       while (pads) {
793         pad = GST_PAD (pads->data);
794         pads = g_list_next (pads);
795
796         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
797           GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
798           pad->chainhandler = pad->chainfunc;
799         } else {
800           GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
801           pad->gethandler = pad->getfunc;
802         }
803       }
804     }
805   }
806 */
807
808 static void 
809 gst_schedule_lock_element (GstSchedule *sched,GstElement *element)
810 {
811   if (element->threadstate)
812     cothread_lock(element->threadstate);
813 }
814
815 static void
816 gst_schedule_unlock_element (GstSchedule *sched,GstElement *element)
817 {
818   if (element->threadstate)
819     cothread_unlock(element->threadstate);
820 }
821
822
823 /*************** INCREMENTAL SCHEDULING CODE STARTS HERE ***************/
824
825
826 static void     gst_schedule_class_init (GstScheduleClass *klass);
827 static void     gst_schedule_init       (GstSchedule *schedule);
828
829 static GstObjectClass *parent_class = NULL;
830
831 GType gst_schedule_get_type(void) {
832   static GType schedule_type = 0;
833
834   if (!schedule_type) {
835     static const GTypeInfo schedule_info = {
836       sizeof(GstScheduleClass),
837       NULL,
838       NULL,
839       (GClassInitFunc)gst_schedule_class_init,
840       NULL,
841       NULL,
842       sizeof(GstSchedule),
843       0,
844       (GInstanceInitFunc)gst_schedule_init,
845       NULL
846     };
847     schedule_type = g_type_register_static(GST_TYPE_OBJECT, "GstSchedule", &schedule_info, 0);
848   }
849   return schedule_type;
850 }
851
852 static void
853 gst_schedule_class_init (GstScheduleClass *klass)
854 {
855   parent_class = g_type_class_ref(GST_TYPE_OBJECT);
856 }
857
858 static void
859 gst_schedule_init (GstSchedule *schedule)
860 {
861   schedule->add_element = GST_DEBUG_FUNCPTR(gst_schedule_add_element);
862   schedule->remove_element = GST_DEBUG_FUNCPTR(gst_schedule_remove_element);
863   schedule->enable_element = GST_DEBUG_FUNCPTR(gst_schedule_enable_element);
864   schedule->disable_element = GST_DEBUG_FUNCPTR(gst_schedule_disable_element);
865   schedule->lock_element = GST_DEBUG_FUNCPTR(gst_schedule_lock_element);
866   schedule->unlock_element = GST_DEBUG_FUNCPTR(gst_schedule_unlock_element);
867   schedule->pad_connect = GST_DEBUG_FUNCPTR(gst_schedule_pad_connect);
868   schedule->pad_disconnect = GST_DEBUG_FUNCPTR(gst_schedule_pad_disconnect);
869   schedule->pad_select = GST_DEBUG_FUNCPTR(gst_schedule_pad_select);
870   schedule->iterate = GST_DEBUG_FUNCPTR(gst_schedule_iterate);
871 }
872
873 GstSchedule*
874 gst_schedule_new(GstElement *parent)
875 {
876   GstSchedule *sched = GST_SCHEDULE (g_object_new(GST_TYPE_SCHEDULE,NULL));
877
878   sched->parent = parent;
879
880   return sched;
881 }
882
883
884 /* this function will look at a pad and determine if the peer parent is
885  * a possible candidate for connecting up in the same chain. */
886 /* DEPRACATED !!!!
887 GstElement *gst_schedule_check_pad (GstSchedule *sched, GstPad *pad) {
888   GstRealPad *peer;
889   GstElement *element, *peerelement;
890
891   GST_INFO (GST_CAT_SCHEDULING, "checking pad %s:%s for peer in scheduler",
892             GST_DEBUG_PAD_NAME(pad));
893
894   element = GST_ELEMENT(GST_PAD_PARENT(peer));
895   GST_DEBUG(GST_CAT_SCHEDULING, "element is \"%s\"\n",GST_ELEMENT_NAME(element));
896
897   peer = GST_PAD_PEER (pad);
898   if (peer == NULL) return NULL;
899   peerelement = GST_ELEMENT(GST_PAD_PARENT (peer));
900   if (peerelement == NULL) return NULL;
901   GST_DEBUG(GST_CAT_SCHEDULING, "peer element is \"%s\"\n",GST_ELEMENT_NAME(peerelement));
902
903   // now check to see if it's in the same schedule
904   if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
905     GST_DEBUG(GST_CAT_SCHEDULING, "peer is in same schedule\n");
906     return peerelement;
907   }
908
909   // otherwise it's not a candidate
910   return NULL;
911 }
912 */
913
914 static GstScheduleChain *
915 gst_schedule_chain_new (GstSchedule *sched)
916 {
917   GstScheduleChain *chain = g_new (GstScheduleChain, 1);
918
919   // initialize the chain with sane values
920   chain->sched = sched;
921   chain->disabled = NULL;
922   chain->elements = NULL;
923   chain->num_elements = 0;
924   chain->entry = NULL;
925   chain->cothreaded_elements = 0;
926   chain->schedule = FALSE;
927
928   // add the chain to the schedules' list of chains
929   sched->chains = g_list_prepend (sched->chains, chain);
930   sched->num_chains++;
931
932   GST_INFO (GST_CAT_SCHEDULING, "created new chain %p, now are %d chains in sched %p",
933             chain,sched->num_chains,sched);
934
935   return chain;
936 }
937
938 static void
939 gst_schedule_chain_destroy (GstScheduleChain *chain)
940 {
941   GstSchedule *sched = chain->sched;
942
943   // remove the chain from the schedules' list of chains
944   chain->sched->chains = g_list_remove (chain->sched->chains, chain);
945   chain->sched->num_chains--;
946
947   // destroy the chain
948   g_list_free (chain->disabled);        // should be empty...
949   g_list_free (chain->elements);        // ditto
950   g_free (chain);
951
952   GST_INFO (GST_CAT_SCHEDULING, "destroyed chain %p, now are %d chains in sched %p",chain,sched->num_chains,sched);
953 }
954
955 static void
956 gst_schedule_chain_add_element (GstScheduleChain *chain, GstElement *element)
957 {
958   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to chain %p", GST_ELEMENT_NAME (element),chain);
959
960   // set the sched pointer for the element
961   element->sched = chain->sched;
962
963   // add the element to the list of 'disabled' elements
964   chain->disabled = g_list_prepend (chain->disabled, element);
965   chain->num_elements++;
966 }
967
968 static void
969 gst_schedule_chain_enable_element (GstScheduleChain *chain, GstElement *element)
970 {
971   GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
972
973   // remove from disabled list
974   chain->disabled = g_list_remove (chain->disabled, element);
975
976   // add to elements list
977   chain->elements = g_list_prepend (chain->elements, element);
978
979   // reschedule the chain
980   gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
981 }
982
983 static void
984 gst_schedule_chain_disable_element (GstScheduleChain *chain, GstElement *element)
985 {
986   GST_INFO (GST_CAT_SCHEDULING, "disabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
987
988   // remove from elements list
989   chain->elements = g_list_remove (chain->elements, element);
990
991   // add to disabled list
992   chain->disabled = g_list_prepend (chain->disabled, element);
993
994   // reschedule the chain
995 // FIXME this should be done only if manager state != NULL
996 //  gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
997 }
998
999 static void
1000 gst_schedule_chain_remove_element (GstScheduleChain *chain, GstElement *element)
1001 {
1002   GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),chain);
1003
1004   // if it's active, deactivate it
1005   if (g_list_find (chain->elements, element)) {
1006     gst_schedule_chain_disable_element (chain, element);
1007   }
1008
1009   // remove the element from the list of elements
1010   chain->disabled = g_list_remove (chain->disabled, element);
1011   chain->num_elements--;
1012
1013   // if there are no more elements in the chain, destroy the chain
1014   if (chain->num_elements == 0)
1015     gst_schedule_chain_destroy(chain);
1016
1017   // unset the sched pointer for the element
1018   element->sched = NULL;
1019 }
1020
1021 static void
1022 gst_schedule_chain_elements (GstSchedule *sched, GstElement *element1, GstElement *element2)
1023 {
1024   GList *chains;
1025   GstScheduleChain *chain;
1026   GstScheduleChain *chain1 = NULL, *chain2 = NULL;
1027   GstElement *element;
1028
1029   // first find the chains that hold the two 
1030   chains = sched->chains;
1031   while (chains) {
1032     chain = (GstScheduleChain *)(chains->data);
1033     chains = g_list_next(chains);
1034
1035     if (g_list_find (chain->disabled,element1))
1036       chain1 = chain;
1037     else if (g_list_find (chain->elements,element1))
1038       chain1 = chain;
1039
1040     if (g_list_find (chain->disabled,element2))
1041       chain2 = chain;
1042     else if (g_list_find (chain->elements,element2))
1043       chain2 = chain;
1044   }
1045
1046   // first check to see if they're in the same chain, we're done if that's the case
1047   if ((chain1 != NULL) && (chain1 == chain2)) {
1048     GST_INFO (GST_CAT_SCHEDULING, "elements are already in the same chain");
1049     return;
1050   }
1051
1052   // now, if neither element has a chain, create one
1053   if ((chain1 == NULL) && (chain2 == NULL)) {
1054     GST_INFO (GST_CAT_SCHEDULING, "creating new chain to hold two new elements");
1055     chain = gst_schedule_chain_new (sched);
1056     gst_schedule_chain_add_element (chain, element1);
1057     gst_schedule_chain_add_element (chain, element2);
1058     // FIXME chain changed here
1059 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1060
1061   // otherwise if both have chains already, join them
1062   } else if ((chain1 != NULL) && (chain2 != NULL)) {
1063     GST_INFO (GST_CAT_SCHEDULING, "merging chain %p into chain %p",chain2,chain1);
1064     // take the contents of chain2 and merge them into chain1
1065     chain1->disabled = g_list_concat (chain1->disabled, g_list_copy(chain2->disabled));
1066     chain1->elements = g_list_concat (chain1->elements, g_list_copy(chain2->elements));
1067     chain1->num_elements += chain2->num_elements;
1068     // FIXME chain changed here
1069 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1070
1071     gst_schedule_chain_destroy(chain2);
1072
1073   // otherwise one has a chain already, the other doesn't
1074   } else {
1075     // pick out which one has the chain, and which doesn't
1076     if (chain1 != NULL) chain = chain1, element = element2;
1077     else chain = chain2, element = element1;
1078
1079     GST_INFO (GST_CAT_SCHEDULING, "adding element to existing chain");
1080     gst_schedule_chain_add_element (chain, element);
1081     // FIXME chain changed here
1082 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1083   }
1084 }
1085
1086 void
1087 gst_schedule_pad_connect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1088 {
1089   GstElement *srcelement,*sinkelement;
1090
1091   srcelement = GST_PAD_PARENT(srcpad);
1092   g_return_if_fail(srcelement != NULL);
1093   sinkelement = GST_PAD_PARENT(sinkpad);
1094   g_return_if_fail(sinkelement != NULL);
1095
1096   GST_INFO (GST_CAT_SCHEDULING, "have pad connected callback on %s:%s to %s:%s",GST_DEBUG_PAD_NAME(srcpad),GST_DEBUG_PAD_NAME(sinkpad));
1097   GST_DEBUG(GST_CAT_SCHEDULING, "srcpad sched is %p, sinkpad sched is %p\n",
1098 GST_ELEMENT_SCHED(srcelement),GST_ELEMENT_SCHED(sinkelement));
1099
1100   if (GST_ELEMENT_SCHED(srcelement) == GST_ELEMENT_SCHED(sinkelement)) {
1101     GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same schedule, chaining together",GST_DEBUG_PAD_NAME(sinkpad));
1102     gst_schedule_chain_elements (sched, srcelement, sinkelement);
1103   }
1104 }
1105
1106 // find the chain within the schedule that holds the element, if any
1107 static GstScheduleChain *
1108 gst_schedule_find_chain (GstSchedule *sched, GstElement *element)
1109 {
1110   GList *chains;
1111   GstScheduleChain *chain;
1112
1113   GST_INFO (GST_CAT_SCHEDULING, "searching for element \"%s\" in chains",GST_ELEMENT_NAME(element));
1114
1115   chains = sched->chains;
1116   while (chains) {
1117     chain = (GstScheduleChain *)(chains->data);
1118     chains = g_list_next (chains);
1119
1120     if (g_list_find (chain->elements, element))
1121       return chain;
1122     if (g_list_find (chain->disabled, element))
1123       return chain;
1124   }
1125
1126   return NULL;
1127 }
1128
1129 static void
1130 gst_schedule_chain_recursive_add (GstScheduleChain *chain, GstElement *element)
1131 {
1132   GList *pads;
1133   GstPad *pad;
1134   GstElement *peerelement;
1135
1136   // add the element to the chain
1137   gst_schedule_chain_add_element (chain, element);
1138
1139   GST_DEBUG(GST_CAT_SCHEDULING, "recursing on element \"%s\"\n",GST_ELEMENT_NAME(element));
1140   // now go through all the pads and see which peers can be added
1141   pads = element->pads;
1142   while (pads) {
1143     pad = GST_PAD(pads->data);
1144     pads = g_list_next (pads);
1145
1146     GST_DEBUG(GST_CAT_SCHEDULING, "have pad %s:%s, checking for valid peer\n",GST_DEBUG_PAD_NAME(pad));
1147     // if the peer exists and could be in the same chain
1148     if (GST_PAD_PEER(pad)) {
1149       GST_DEBUG(GST_CAT_SCHEDULING, "has peer %s:%s\n",GST_DEBUG_PAD_NAME(GST_PAD_PEER(pad)));
1150       peerelement = GST_PAD_PARENT(GST_PAD_PEER(pad));
1151       if (GST_ELEMENT_SCHED(GST_PAD_PARENT(pad)) == GST_ELEMENT_SCHED(peerelement)) {
1152         GST_DEBUG(GST_CAT_SCHEDULING, "peer \"%s\" is valid for same chain\n",GST_ELEMENT_NAME(peerelement));
1153         // if it's not already in a chain, add it to this one
1154         if (gst_schedule_find_chain (chain->sched, peerelement) == NULL) {
1155           gst_schedule_chain_recursive_add (chain, peerelement);
1156         }
1157       }
1158     }
1159   }
1160 }
1161
1162 void
1163 gst_schedule_pad_disconnect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1164 {
1165   GstScheduleChain *chain;
1166   GstElement *element1, *element2;
1167   GstScheduleChain *chain1, *chain2;
1168
1169   GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
1170             GST_DEBUG_PAD_NAME(srcpad), GST_DEBUG_PAD_NAME(sinkpad));
1171
1172   // we need to have the parent elements of each pad
1173   element1 = GST_ELEMENT(GST_PAD_PARENT(srcpad));
1174   element2 = GST_ELEMENT(GST_PAD_PARENT(sinkpad));
1175
1176   // first task is to remove the old chain they belonged to.
1177   // this can be accomplished by taking either of the elements,
1178   // since they are guaranteed to be in the same chain
1179   // FIXME is it potentially better to make an attempt at splitting cleaner??
1180   chain = gst_schedule_find_chain (sched, element1);
1181   if (chain) {
1182     GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1183     gst_schedule_chain_destroy (chain);
1184   }
1185
1186   // now create a new chain to hold element1 and build it from scratch
1187   chain1 = gst_schedule_chain_new (sched);
1188   gst_schedule_chain_recursive_add (chain1, element1);
1189
1190   // check the other element to see if it landed in the newly created chain
1191   if (gst_schedule_find_chain (sched, element2) == NULL) {
1192     // if not in chain, create chain and build from scratch
1193     chain2 = gst_schedule_chain_new (sched);
1194     gst_schedule_chain_recursive_add (chain2, element2);
1195   }
1196 }
1197
1198 GstPad*
1199 gst_schedule_pad_select (GstSchedule *sched, GList *padlist)
1200 {
1201   GstPad *pad = NULL;
1202   GList *padlist2 = padlist;
1203   GST_INFO (GST_CAT_SCHEDULING, "performing select");
1204
1205   while (padlist2) {
1206     pad = GST_PAD (padlist2->data);
1207
1208     if (gst_pad_peek (pad)) {
1209       g_print ("found something in pad %s:%s\n", GST_DEBUG_PAD_NAME (pad));
1210       return pad;
1211     }
1212     
1213     padlist2 = g_list_next (padlist2);
1214   }
1215
1216   /* else there is nothing ready to consume, set up the select functions */
1217   while (padlist) {
1218     pad = GST_PAD (padlist->data);
1219
1220     GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_select_proxy);
1221
1222     padlist = g_list_next (padlist);
1223   }
1224   if (pad != NULL) {
1225     GstRealPad *peer = GST_RPAD_PEER(pad);
1226     
1227     cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
1228
1229     g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
1230     pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
1231
1232     g_assert (pad != NULL);
1233     g_print ("back from select (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
1234   }
1235   return pad;
1236 }
1237
1238 void
1239 gst_schedule_add_element (GstSchedule *sched, GstElement *element)
1240 {
1241   GList *pads;
1242   GstPad *pad;
1243   GstElement *peerelement;
1244   GstScheduleChain *chain;
1245
1246   g_return_if_fail (element != NULL);
1247   g_return_if_fail (GST_IS_ELEMENT(element));
1248
1249   // if it's already in this schedule, don't bother doing anything
1250   if (GST_ELEMENT_SCHED(element) == sched) return;
1251
1252   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to schedule",
1253     GST_ELEMENT_NAME(element));
1254
1255   // if the element already has a different scheduler, remove the element from it
1256   if (GST_ELEMENT_SCHED(element)) {
1257     gst_schedule_remove_element(GST_ELEMENT_SCHED(element),element);
1258   }
1259
1260   // set the sched pointer in the element itself
1261   GST_ELEMENT_SCHED(element) = sched;
1262
1263   // only deal with elements after this point, not bins
1264   // exception is made for Bin's that are schedulable, like the autoplugger
1265   if (GST_IS_BIN (element) && !GST_FLAG_IS_SET(element, GST_BIN_SELF_SCHEDULABLE)) return;
1266
1267   // first add it to the list of elements that are to be scheduled
1268   sched->elements = g_list_prepend (sched->elements, element);
1269   sched->num_elements++;
1270
1271   // create a chain to hold it, and add
1272   chain = gst_schedule_chain_new (sched);
1273   gst_schedule_chain_add_element (chain, element);
1274
1275   // set the sched pointer in all the pads
1276   pads = element->pads;
1277   while (pads) {
1278     pad = GST_PAD(pads->data);
1279     pads = g_list_next(pads);
1280
1281     // we only operate on real pads
1282     if (!GST_IS_REAL_PAD(pad)) continue;
1283
1284     // set the pad's sched pointer
1285     gst_pad_set_sched (pad, sched);
1286
1287     // if the peer element exists and is a candidate
1288     if (GST_PAD_PEER(pad)) {
1289       peerelement = GST_PAD_PARENT( GST_PAD_PEER (pad) );
1290       if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
1291         GST_INFO (GST_CAT_SCHEDULING, "peer is in same schedule, chaining together");
1292         // make sure that the two elements are in the same chain
1293         gst_schedule_chain_elements (sched,element,peerelement);
1294       }
1295     }
1296   }
1297 }
1298
1299 void
1300 gst_schedule_enable_element (GstSchedule *sched, GstElement *element)
1301 {
1302   GstScheduleChain *chain;
1303
1304   // find the chain the element's in
1305   chain = gst_schedule_find_chain (sched, element);
1306
1307   if (chain)
1308     gst_schedule_chain_enable_element (chain, element);
1309   else
1310     GST_INFO (GST_CAT_SCHEDULING, "element not found in any chain, not enabling");
1311 }
1312
1313 void
1314 gst_schedule_disable_element (GstSchedule *sched, GstElement *element)
1315 {
1316   GstScheduleChain *chain;
1317
1318   // find the chain the element is in
1319   chain = gst_schedule_find_chain (sched, element);
1320
1321   // remove it from the chain
1322   if (chain) {
1323     gst_schedule_chain_disable_element(chain,element);
1324   }
1325 }
1326
1327 void
1328 gst_schedule_remove_element (GstSchedule *sched, GstElement *element)
1329 {
1330   GstScheduleChain *chain;
1331
1332   g_return_if_fail (element != NULL);
1333   g_return_if_fail (GST_IS_ELEMENT(element));
1334
1335   if (g_list_find (sched->elements, element)) {
1336     GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from schedule",
1337       GST_ELEMENT_NAME(element));
1338
1339     // find what chain the element is in
1340     chain = gst_schedule_find_chain(sched, element);
1341
1342     // remove it from its chain
1343     gst_schedule_chain_remove_element (chain, element);
1344
1345     // remove it from the list of elements
1346     sched->elements = g_list_remove (sched->elements, element);
1347     sched->num_elements--;
1348
1349     // unset the scheduler pointer in the element
1350     GST_ELEMENT_SCHED(element) = NULL;
1351   }
1352 }
1353
1354 gboolean
1355 gst_schedule_iterate (GstSchedule *sched)
1356 {
1357   GstBin *bin = GST_BIN(sched->parent);
1358   GList *chains;
1359   GstScheduleChain *chain;
1360   GstElement *entry;
1361   gint num_scheduled = 0;
1362   gboolean eos = FALSE;
1363   GList *elements;
1364
1365   GST_DEBUG_ENTER("(\"%s\")", GST_ELEMENT_NAME (bin));
1366
1367   g_return_val_if_fail (bin != NULL, TRUE);
1368   g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
1369 //  g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
1370
1371   // step through all the chains
1372   chains = sched->chains;
1373 //  if (chains == NULL) return FALSE;
1374 g_return_val_if_fail (chains != NULL, FALSE);
1375   while (chains) {
1376     chain = (GstScheduleChain *)(chains->data);
1377     chains = g_list_next (chains);
1378
1379 //    if (!chain->need_scheduling) continue;
1380
1381 //    if (chain->need_cothreads) {
1382       // all we really have to do is switch to the first child
1383       // FIXME this should be lots more intelligent about where to start
1384       GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via cothreads\n");
1385
1386       if (chain->elements) {
1387         entry = NULL; //MattH ADDED?
1388 GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_elements);
1389         elements = chain->elements;
1390         while (elements) {
1391           entry = GST_ELEMENT(elements->data);
1392           elements = g_list_next(elements);
1393           if (GST_FLAG_IS_SET(entry,GST_ELEMENT_DECOUPLED)) {
1394             GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is DECOUPLED, skipping\n",GST_ELEMENT_NAME(entry));
1395             entry = NULL;
1396           } else if (GST_FLAG_IS_SET(entry,GST_ELEMENT_NO_ENTRY)) {
1397             GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is not valid, skipping\n",GST_ELEMENT_NAME(entry));
1398             entry = NULL;
1399           } else
1400             break;
1401         }
1402         if (entry) {
1403           GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1404           GST_DEBUG (GST_CAT_DATAFLOW,"set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
1405                GST_ELEMENT_NAME (entry),entry);
1406           cothread_switch (entry->threadstate);
1407
1408           // following is a check to see if the chain was interrupted due to a
1409           // top-half state_change().  (i.e., if there's a pending state.)
1410           //
1411           // if it was, return to gstthread.c::gst_thread_main_loop() to
1412           // execute the state change.
1413           GST_DEBUG (GST_CAT_DATAFLOW,"cothread switch ended or interrupted\n");
1414           if (GST_STATE_PENDING(GST_SCHEDULE(sched)->parent) != GST_STATE_VOID_PENDING)
1415           {
1416             GST_DEBUG (GST_CAT_DATAFLOW,"handle pending state %d\n",
1417                        GST_STATE_PENDING(GST_SCHEDULE(sched)->parent));
1418             return 0;
1419           }
1420
1421         } else {
1422           GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
1423           //eos = TRUE;
1424         }
1425       } else {
1426         GST_INFO (GST_CAT_DATAFLOW,"NO ENABLED ELEMENTS IN CHAIN!!");
1427         //eos = TRUE;
1428       }
1429
1430 /*                
1431     } else {
1432       GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via chain-functions\n");
1433
1434       entries = chain->entries;
1435          
1436       g_assert (entries != NULL);
1437      
1438       while (entries) {
1439         entry = GST_ELEMENT (entries->data);
1440         entries = g_list_next (entries);
1441  
1442         GST_DEBUG (GST_CAT_DATAFLOW,"have entry \"%s\"\n",GST_ELEMENT_NAME (entry));
1443   
1444         if (GST_IS_BIN (entry)) {
1445           gst_bin_iterate (GST_BIN (entry));
1446         } else {
1447           pads = entry->pads;
1448           while (pads) {
1449             pad = GST_PAD (pads->data);
1450             if (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC) {
1451               GST_DEBUG (GST_CAT_DATAFLOW,"calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
1452               if (GST_REAL_PAD(pad)->getfunc == NULL) 
1453                 fprintf(stderr, "error, no getfunc in \"%s\"\n", GST_ELEMENT_NAME  (entry));
1454               else
1455                 buf = (GST_REAL_PAD(pad)->getfunc)(pad);
1456               if (buf) gst_pad_push(pad,buf);
1457             }
1458             pads = g_list_next (pads);
1459           }
1460         }
1461       }
1462     }*/
1463     num_scheduled++;
1464   }
1465
1466 /*
1467   // check if nothing was scheduled that was ours..
1468   if (!num_scheduled) {
1469     // are there any other elements that are still busy?
1470     if (bin->num_eos_providers) {
1471       GST_LOCK (bin);
1472       GST_DEBUG (GST_CATA_DATAFLOW,"waiting for eos providers\n");
1473       g_cond_wait (bin->eoscond, GST_OBJECT(bin)->lock);  
1474       GST_DEBUG (GST_CAT_DATAFLOW,"num eos providers %d\n", bin->num_eos_providers);
1475       GST_UNLOCK (bin);
1476     }
1477     else {      
1478       gst_element_signal_eos (GST_ELEMENT (bin));
1479       eos = TRUE;
1480     }       
1481   }
1482 */
1483
1484   GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
1485   return !eos;
1486 }
1487
1488
1489
1490 void
1491 gst_schedule_show (GstSchedule *sched)
1492 {
1493   GList *chains, *elements;
1494   GstElement *element;
1495   GstScheduleChain *chain;
1496
1497   if (sched == NULL) {
1498     g_print("schedule doesn't exist for this element\n");
1499     return;
1500   }
1501
1502   g_return_if_fail(GST_IS_SCHEDULE(sched));
1503
1504   g_print("SCHEDULE DUMP FOR MANAGING BIN \"%s\"\n",GST_ELEMENT_NAME(sched->parent));
1505
1506   g_print("schedule has %d elements in it: ",sched->num_elements);
1507   elements = sched->elements;
1508   while (elements) {
1509     element = GST_ELEMENT(elements->data);
1510     elements = g_list_next(elements);
1511
1512     g_print("%s, ",GST_ELEMENT_NAME(element));
1513   }
1514   g_print("\n");
1515
1516   g_print("schedule has %d chains in it\n",sched->num_chains);
1517   chains = sched->chains;
1518   while (chains) {
1519     chain = (GstScheduleChain *)(chains->data);
1520     chains = g_list_next(chains);
1521
1522     g_print("%p: ",chain);
1523
1524     elements = chain->disabled;
1525     while (elements) {
1526       element = GST_ELEMENT(elements->data);
1527       elements = g_list_next(elements);
1528
1529       g_print("!%s, ",GST_ELEMENT_NAME(element));
1530     }
1531
1532     elements = chain->elements;
1533     while (elements) {
1534       element = GST_ELEMENT(elements->data);
1535       elements = g_list_next(elements);
1536
1537       g_print("%s, ",GST_ELEMENT_NAME(element));
1538     }
1539     g_print("\n");
1540   }
1541 }