Properly set the eos flag when no elements can be scheduled.
[platform/upstream/gstreamer.git] / gst / gstscheduler.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstscheduler.c: Default scheduling code for most cases
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 //#define GST_DEBUG_ENABLED
24 #include "gst_private.h"
25
26 #include "gstscheduler.h"
27
28
29 static int
30 gst_schedule_loopfunc_wrapper (int argc,char *argv[])
31 {
32   GstElement *element = GST_ELEMENT (argv);
33   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
34
35   GST_DEBUG_ENTER("(%d,'%s')",argc,name);
36
37   do {
38     GST_DEBUG (GST_CAT_DATAFLOW,"calling loopfunc %s for element %s\n",
39                GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
40     (element->loopfunc) (element);
41     GST_DEBUG (GST_CAT_DATAFLOW,"element %s ended loop function\n", name);
42   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
43   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
44
45   GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
46   return 0;
47 }
48
49 static int
50 gst_schedule_chain_wrapper (int argc,char *argv[])
51 {
52   GstElement *element = GST_ELEMENT (argv);
53   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
54   GList *pads;
55   GstPad *pad;
56   GstRealPad *realpad;
57   GstBuffer *buf;
58
59   GST_DEBUG_ENTER("(\"%s\")",name);
60
61   GST_DEBUG (GST_CAT_DATAFLOW,"stepping through pads\n");
62   do {
63     pads = element->pads;
64     while (pads) {
65       pad = GST_PAD (pads->data);
66       pads = g_list_next (pads);
67       if (!GST_IS_REAL_PAD(pad)) continue;
68       realpad = GST_REAL_PAD(pad);
69       if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SINK) {
70         GST_DEBUG (GST_CAT_DATAFLOW,"pulling a buffer from %s:%s\n", name, GST_PAD_NAME (pad));
71         buf = gst_pad_pull (pad);
72         GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s\n", name, GST_PAD_NAME (pad));
73         if (buf) GST_RPAD_CHAINFUNC(realpad) (pad,buf);
74         GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s done\n", name, GST_PAD_NAME (pad));
75       }
76     }
77   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
78   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
79
80   GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
81   return 0;
82 }
83
84 static int
85 gst_schedule_src_wrapper (int argc,char *argv[])
86 {
87   GstElement *element = GST_ELEMENT (argv);
88   GList *pads;
89   GstRealPad *realpad;
90   GstBuffer *buf = NULL;
91   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
92
93   GST_DEBUG_ENTER("(%d,\"%s\")",argc,name);
94
95   do {
96     pads = element->pads;
97     while (pads) {
98       if (!GST_IS_REAL_PAD(pads->data)) continue;
99       realpad = (GstRealPad*)(pads->data);
100       pads = g_list_next(pads);
101       if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SRC) {
102         GST_DEBUG (GST_CAT_DATAFLOW,"calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
103         if (realpad->regiontype != GST_REGION_VOID) {
104           g_return_val_if_fail (GST_RPAD_GETREGIONFUNC(realpad) != NULL, 0);
105 //          if (GST_RPAD_GETREGIONFUNC(realpad) == NULL)
106 //            fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
107 //          else
108           buf = (GST_RPAD_GETREGIONFUNC(realpad))((GstPad*)realpad,realpad->regiontype,realpad->offset,realpad->len);
109           realpad->regiontype = GST_REGION_VOID;
110         } else {
111           g_return_val_if_fail (GST_RPAD_GETFUNC(realpad) != NULL, 0);
112 //          if (GST_RPAD_GETFUNC(realpad) == NULL)
113 //            fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
114 //          else
115           buf = GST_RPAD_GETFUNC(realpad) ((GstPad*)realpad);
116         }
117
118         GST_DEBUG (GST_CAT_DATAFLOW,"calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
119         gst_pad_push ((GstPad*)realpad, buf);
120       }
121     }
122   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
123   GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
124
125   GST_DEBUG_LEAVE("");
126   return 0;
127 }
128
129 static void
130 gst_schedule_pushfunc_proxy (GstPad *pad, GstBuffer *buf)
131 {
132   GstRealPad *peer = GST_RPAD_PEER(pad);
133
134   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
135   GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
136
137   // FIXME this should be bounded
138   // loop until the bufferpen is empty so we can fill it up again
139   while (GST_RPAD_BUFPEN(pad) != NULL) {
140     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
141                GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
142     cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
143
144     // we may no longer be the same pad, check.
145     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
146       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
147       pad = (GstPad *)GST_RPAD_PEER(peer);
148     }
149   }
150
151   g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
152   // now fill the bufferpen and switch so it can be consumed
153   GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
154   GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
155   cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
156
157   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
158 }
159
160 static void
161 gst_schedule_select_proxy (GstPad *pad, GstBuffer *buf)
162 {
163   GstRealPad *peer = GST_RPAD_PEER(pad);
164
165   g_print ("select proxy (%s:%s)\n",GST_DEBUG_PAD_NAME(pad));
166
167   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
168   GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
169
170   g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
171   // now fill the bufferpen and switch so it can be consumed
172   GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
173   GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
174   g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
175   GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
176   cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
177
178   g_print ("done switching\n");
179   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
180 }
181
182
183 static GstBuffer*
184 gst_schedule_pullfunc_proxy (GstPad *pad)
185 {
186   GstBuffer *buf;
187   GstRealPad *peer = GST_RPAD_PEER(pad);
188
189   GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
190
191   // FIXME this should be bounded
192   // we will loop switching to the peer until it's filled up the bufferpen
193   while (GST_RPAD_BUFPEN(pad) == NULL) {
194     GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
195                GST_ELEMENT_NAME(GST_ELEMENT(GST_PAD_PARENT(pad))),
196                GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
197     cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
198
199     // we may no longer be the same pad, check.
200     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
201       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
202       pad = (GstPad *)GST_RPAD_PEER(peer);
203     }
204   }
205   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
206
207   // now grab the buffer from the pen, clear the pen, and return the buffer
208   buf = GST_RPAD_BUFPEN(pad);
209   GST_RPAD_BUFPEN(pad) = NULL;
210   return buf;
211 }
212
213 static GstBuffer*
214 gst_schedule_pullregionfunc_proxy (GstPad *pad,GstRegionType type,guint64 offset,guint64 len)
215 {
216   GstBuffer *buf;
217   GstRealPad *peer = GST_RPAD_PEER(pad);
218
219   GST_DEBUG_ENTER("%s:%s,%d,%lld,%lld",GST_DEBUG_PAD_NAME(pad),type,offset,len);
220
221   // put the region info into the pad
222   GST_RPAD_REGIONTYPE(pad) = type;
223   GST_RPAD_OFFSET(pad) = offset;
224   GST_RPAD_LEN(pad) = len;
225
226   // FIXME this should be bounded
227   // we will loop switching to the peer until it's filled up the bufferpen
228   while (GST_RPAD_BUFPEN(pad) == NULL) {
229     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
230                GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
231     cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
232
233     // we may no longer be the same pad, check.
234     if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
235       GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
236       pad = (GstPad *)GST_RPAD_PEER(peer);
237     }
238   }
239   GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
240
241   // now grab the buffer from the pen, clear the pen, and return the buffer
242   buf = GST_RPAD_BUFPEN(pad);
243   GST_RPAD_BUFPEN(pad) = NULL;
244   return buf;
245 }
246
247
248 static void
249 gst_schedule_cothreaded_chain (GstBin *bin, GstScheduleChain *chain) {
250   GList *elements;
251   GstElement *element;
252   cothread_func wrapper_function;
253   GList *pads;
254   GstPad *pad;
255
256   GST_DEBUG (GST_CAT_SCHEDULING,"chain is using COTHREADS\n");
257
258   // first create thread context
259   if (bin->threadcontext == NULL) {
260     GST_DEBUG (GST_CAT_SCHEDULING,"initializing cothread context\n");
261     bin->threadcontext = cothread_init ();
262   }
263
264   // walk through all the chain's elements
265   elements = chain->elements;
266   while (elements) {
267     element = GST_ELEMENT (elements->data);
268     elements = g_list_next (elements);
269
270     // start out without a wrapper function, we select it later
271     wrapper_function = NULL;
272
273     // if the element has a loopfunc...
274     if (element->loopfunc != NULL) {
275       wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_loopfunc_wrapper);
276       GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a loop-based\n",GST_ELEMENT_NAME(element));
277     } else {
278       // otherwise we need to decide what kind of cothread
279       // if it's not DECOUPLED, we decide based on whether it's a source or not
280       if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
281         // if it doesn't have any sinks, it must be a source (duh)
282         if (element->numsinkpads == 0) {
283           wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_src_wrapper);
284           GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a source, using _src_wrapper\n",GST_ELEMENT_NAME(element));
285         } else {
286           wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_chain_wrapper);
287           GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a filter, using _chain_wrapper\n",GST_ELEMENT_NAME(element));
288         }
289       }
290     }
291
292     // now we have to walk through the pads to set up their state
293     pads = gst_element_get_pad_list (element);
294     while (pads) {
295       pad = GST_PAD (pads->data);
296       pads = g_list_next (pads);
297       if (!GST_IS_REAL_PAD(pad)) continue;
298
299       // if the element is DECOUPLED or outside the manager, we have to chain
300       if ((wrapper_function == NULL) ||
301           (GST_RPAD_PEER(pad) &&
302            (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
303          ) {
304         // set the chain proxies
305         if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
306           GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
307           GST_RPAD_PUSHFUNC(pad) = GST_RPAD_CHAINFUNC(pad);
308         } else {
309           GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
310           GST_RPAD_PULLFUNC(pad) = GST_RPAD_GETFUNC(pad);
311           GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
312         }
313
314       // otherwise we really are a cothread
315       } else {
316         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
317           GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
318           GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pushfunc_proxy);
319         } else {
320           GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
321           GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullfunc_proxy);
322           GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullregionfunc_proxy);
323         }
324       }
325     }
326
327     // need to set up the cothread now
328     if (wrapper_function != NULL) {
329       if (element->threadstate == NULL) {
330         // FIXME handle cothread_create returning NULL
331         element->threadstate = cothread_create (bin->threadcontext);
332         GST_DEBUG (GST_CAT_SCHEDULING,"created cothread %p for '%s'\n",element->threadstate,GST_ELEMENT_NAME(element));
333       }
334       cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
335       GST_DEBUG (GST_CAT_SCHEDULING,"set wrapper function for '%s' to &%s\n",GST_ELEMENT_NAME(element),
336             GST_DEBUG_FUNCPTR_NAME(wrapper_function));
337     }
338   }
339 }
340
341 G_GNUC_UNUSED static void
342 gst_schedule_chained_chain (GstBin *bin, _GstBinChain *chain) {
343   GList *elements;
344   GstElement *element;
345   GList *pads;
346   GstPad *pad;
347
348   GST_DEBUG (GST_CAT_SCHEDULING,"chain entered\n");
349   // walk through all the elements
350   elements = chain->elements;
351   while (elements) {
352     element = GST_ELEMENT (elements->data);
353     elements = g_list_next (elements);
354
355     // walk through all the pads
356     pads = gst_element_get_pad_list (element);
357     while (pads) {
358       pad = GST_PAD (pads->data);
359       pads = g_list_next (pads);
360       if (!GST_IS_REAL_PAD(pad)) continue;
361
362       if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
363         GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
364         GST_RPAD_PUSHFUNC(pad) = GST_RPAD_CHAINFUNC(pad);
365       } else {
366         GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
367         GST_RPAD_PULLFUNC(pad) = GST_RPAD_GETFUNC(pad);
368         GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
369       }
370     }
371   }
372 }
373
374 /* depracated!! */
375 static void
376 gst_bin_schedule_cleanup (GstBin *bin)
377 {
378   GList *chains;
379   _GstBinChain *chain;
380
381   chains = bin->chains;
382   while (chains) {
383     chain = (_GstBinChain *)(chains->data);
384     chains = g_list_next(chains);
385
386 //    g_list_free(chain->disabled);
387     g_list_free(chain->elements);
388     g_list_free(chain->entries);
389
390     g_free(chain);
391   }
392   g_list_free(bin->chains);
393
394   bin->chains = NULL;
395 }
396
397 static void
398 gst_scheduler_handle_eos (GstElement *element, _GstBinChain *chain)
399 {
400   GST_DEBUG (GST_CAT_SCHEDULING,"chain removed from scheduler, EOS from element \"%s\"\n", GST_ELEMENT_NAME (element));
401   chain->need_scheduling = FALSE;
402 }
403
404 /*
405 void gst_bin_schedule_func(GstBin *bin) {
406   GList *elements;
407   GstElement *element;
408   GSList *pending = NULL;
409   GList *pads;
410   GstPad *pad;
411   GstElement *peerparent;
412   GList *chains;
413   GstScheduleChain *chain;
414
415   GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME (GST_ELEMENT (bin)));
416
417   gst_bin_schedule_cleanup(bin);
418
419   // next we have to find all the separate scheduling chains
420   GST_DEBUG (GST_CAT_SCHEDULING,"attempting to find scheduling chains...\n");
421   // first make a copy of the managed_elements we can mess with
422   elements = g_list_copy (bin->managed_elements);
423   // we have to repeat until the list is empty to get all chains
424   while (elements) {
425     element = GST_ELEMENT (elements->data);
426
427     // if this is a DECOUPLED element
428     if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
429       // skip this element entirely
430       GST_DEBUG (GST_CAT_SCHEDULING,"skipping '%s' because it's decoupled\n",GST_ELEMENT_NAME(element));
431       elements = g_list_next (elements);
432       continue;
433     }
434
435     GST_DEBUG (GST_CAT_SCHEDULING,"starting with element '%s'\n",GST_ELEMENT_NAME(element));
436
437     // prime the pending list with the first element off the top
438     pending = g_slist_prepend (NULL, element);
439     // and remove that one from the main list
440     elements = g_list_remove (elements, element);
441
442     // create a chain structure
443     chain = g_new0 (_GstBinChain, 1);
444     chain->need_scheduling = TRUE;
445
446     // for each pending element, walk the pipeline
447     do {
448       // retrieve the top of the stack and pop it
449       element = GST_ELEMENT (pending->data);
450       pending = g_slist_remove (pending, element);
451
452       // add ourselves to the chain's list of elements
453       GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to chain\n",GST_ELEMENT_NAME(element));
454       chain->elements = g_list_prepend (chain->elements, element);
455       chain->num_elements++;
456       gtk_signal_connect (G_OBJECT (element), "eos", gst_scheduler_handle_eos, chain);
457       // set the cothreads flag as appropriate
458       if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
459         chain->need_cothreads = TRUE;
460       if (bin->use_cothreads == TRUE)
461         chain->need_cothreads = TRUE;
462
463       // if we're managed by the current bin, and we're not decoupled,
464       // go find all the peers and add them to the list of elements to check
465       if ((element->manager == GST_ELEMENT(bin)) &&
466           !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
467         // remove ourselves from the outer list of all managed elements
468 //        GST_DEBUG (GST_CAT_SCHEDULING,"removing '%s' from list of possible elements\n",GST_ELEMENT_NAME(element));
469         elements = g_list_remove (elements, element);
470
471         // if this element is a source, add it as an entry
472         if (element->numsinkpads == 0) {
473           chain->entries = g_list_prepend (chain->entries, element);
474           GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as SRC entry into the chain\n",GST_ELEMENT_NAME(element));
475         }
476
477         // now we have to walk the pads to find peers
478         pads = gst_element_get_pad_list (element);
479         while (pads) {
480           pad = GST_PAD (pads->data);
481           pads = g_list_next (pads);
482           if (!GST_IS_REAL_PAD(pad)) continue;
483           GST_DEBUG (GST_CAT_SCHEDULING,"have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
484
485           if (GST_RPAD_PEER(pad) == NULL) continue;
486           if (GST_RPAD_PEER(pad) == NULL) GST_ERROR(pad,"peer is null!");
487           g_assert(GST_RPAD_PEER(pad) != NULL);
488           g_assert(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))) != NULL);
489
490           peerparent = GST_ELEMENT(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))));
491
492           GST_DEBUG (GST_CAT_SCHEDULING,"peer pad %p\n", GST_RPAD_PEER(pad));
493           // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
494           // only add it if it's in the list of un-visited elements still
495           if ((g_list_find (elements, peerparent) != NULL) ||
496               GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED)) {
497             // add the peer element to the pending list
498             GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to list of pending elements\n",
499                        GST_ELEMENT_NAME(peerparent));
500             pending = g_slist_prepend (pending, peerparent);
501
502             // if this is a sink pad, then the element on the other side is an entry
503             if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
504                 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
505               chain->entries = g_list_prepend (chain->entries, peerparent);
506               gtk_signal_connect (G_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
507               GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as DECOUPLED entry into the chain\n",GST_ELEMENT_NAME(peerparent));
508             }
509           } else
510             GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' has already been dealt with\n",GST_ELEMENT_NAME(peerparent));
511         }
512       }
513     } while (pending);
514
515     // add the chain to the bin
516     GST_DEBUG (GST_CAT_SCHEDULING,"have chain with %d elements: ",chain->num_elements);
517     { GList *elements = chain->elements;
518       while (elements) {
519         element = GST_ELEMENT (elements->data);
520         elements = g_list_next(elements);
521         GST_DEBUG_NOPREFIX(GST_CAT_SCHEDULING,"%s, ",GST_ELEMENT_NAME(element));
522       }
523     }
524     GST_DEBUG_NOPREFIX(GST_CAT_DATAFLOW,"\n");
525     bin->chains = g_list_prepend (bin->chains, chain);
526     bin->num_chains++;
527   }
528   // free up the list in case it's full of DECOUPLED elements
529   g_list_free (elements);
530
531   GST_DEBUG (GST_CAT_SCHEDULING,"\nwe have %d chains to schedule\n",bin->num_chains);
532
533   // now we have to go through all the chains and schedule them
534   chains = bin->chains;
535   while (chains) {
536     chain = (GstScheduleChain *)(chains->data);
537     chains = g_list_next (chains);
538
539     // schedule as appropriate
540     if (chain->need_cothreads) {
541       gst_schedule_cothreaded_chain (bin,chain);
542     } else {
543       gst_schedule_chained_chain (bin,chain);
544     }
545   }
546
547   GST_DEBUG_LEAVE("(\"%s\")",GST_ELEMENT_NAME(GST_ELEMENT(bin)));
548 }
549 */
550
551
552 /*
553         // ***** check for possible connections outside
554         // get the pad's peer
555         peer = gst_pad_get_peer (pad);
556         // FIXME this should be an error condition, if not disabled
557         if (!peer) break;
558         // get the parent of the peer of the pad
559         outside = GST_ELEMENT (gst_pad_get_parent (peer));
560         // FIXME this should *really* be an error condition
561         if (!outside) break;
562         // if it's a source or connection and it's not ours...
563         if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
564             (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
565           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
566             GST_DEBUG (0,"dealing with outside source element %s\n",GST_ELEMENT_NAME(outside));
567 //            GST_DEBUG (0,"PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n",
568 //GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc);
569 //            pad->pullfunc = pad->peer->pullfunc;
570 //            GST_DEBUG (0,"PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
571 //            pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy);
572             GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
573             GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
574           }
575         } else {
576 */
577
578
579
580
581
582 /*
583       } else if (GST_IS_SRC (element)) {
584         GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
585         bin->entries = g_list_prepend (bin->entries,element);
586         bin->num_entries++;
587         cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
588       }
589
590       pads = gst_element_get_pad_list (element);
591       while (pads) {
592         pad = GST_PAD(pads->data);
593
594         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
595           GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
596           // set the proxy functions
597           pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
598           GST_DEBUG (0,"pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
599         } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
600           GST_DEBUG (0,"setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
601           // set the proxy functions
602           GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
603           GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
604           GST_DEBUG (0,"pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
605                 &pad->pullfunc,gst_bin_pullfunc_proxy);
606           pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
607         }
608         pads = g_list_next (pads);
609       }
610       elements = g_list_next (elements);
611
612       // if there are no entries, we have to pick one at random
613       if (bin->num_entries == 0)
614         bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
615     }
616   } else {
617     GST_DEBUG (0,"don't need cothreads, looking for entry points\n");
618     // we have to find which elements will drive an iteration
619     elements = bin->children;
620     while (elements) {
621       element = GST_ELEMENT (elements->data);
622       GST_DEBUG (0,"found element \"%s\"\n", GST_ELEMENT_NAME (element));
623       if (GST_IS_BIN (element)) {
624         gst_bin_create_plan (GST_BIN (element));
625       }
626       if (GST_IS_SRC (element)) {
627         GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
628         bin->entries = g_list_prepend (bin->entries, element);
629         bin->num_entries++;
630       }
631
632       // go through all the pads, set pointers, and check for connections
633       pads = gst_element_get_pad_list (element);
634       while (pads) {
635         pad = GST_PAD (pads->data);
636
637         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
638           GST_DEBUG (0,"found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
639
640           // copy the peer's chain function, easy enough
641           GST_DEBUG (0,"copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
642           GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)));
643
644           // need to walk through and check for outside connections
645 //FIXME need to do this for all pads
646           // get the pad's peer
647           peer = GST_RPAD_PEER(pad);
648           if (!peer) {
649             GST_DEBUG (0,"found SINK pad %s has no peer\n", GST_ELEMENT_NAME (pad));
650             break;
651           }
652           // get the parent of the peer of the pad
653           outside = GST_ELEMENT (GST_RPAD_PARENT(peer));
654           if (!outside) break;
655           // if it's a connection and it's not ours...
656           if (GST_IS_CONNECTION (outside) &&
657                (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
658             gst_info("gstbin: element \"%s\" is the external source Connection "
659                                     "for internal element \"%s\"\n",
660                           GST_ELEMENT_NAME (GST_ELEMENT (outside)),
661                           GST_ELEMENT_NAME (GST_ELEMENT (element)));
662             bin->entries = g_list_prepend (bin->entries, outside);
663             bin->num_entries++;
664           }
665         }
666         else {
667           GST_DEBUG (0,"found pad %s\n", GST_ELEMENT_NAME (pad));
668         }
669         pads = g_list_next (pads);
670
671       }
672       elements = g_list_next (elements);
673     }
674 */
675
676
677
678
679 /*
680   // If cothreads are needed, we need to not only find elements but
681   // set up cothread states and various proxy functions.
682   if (bin->need_cothreads) {
683     GST_DEBUG (0,"bin is using cothreads\n");
684
685     // first create thread context
686     if (bin->threadcontext == NULL) {
687       GST_DEBUG (0,"initializing cothread context\n");
688       bin->threadcontext = cothread_init ();
689     }
690
691     // walk through all the children
692     elements = bin->managed_elements;
693     while (elements) {
694       element = GST_ELEMENT (elements->data);
695       elements = g_list_next (elements);
696
697       // start out with a NULL warpper function, we'll set it if we want a cothread
698       wrapper_function = NULL;
699
700       // have to decide if we need to or can use a cothreads, and if so which wrapper
701       // first of all, if there's a loopfunc, the decision's already made
702       if (element->loopfunc != NULL) {
703         wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
704         GST_DEBUG (0,"element %s is a loopfunc, must use a cothread\n",GST_ELEMENT_NAME (element));
705       } else {
706         // otherwise we need to decide if it needs a cothread
707         // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
708         if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
709             (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
710              !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
711           // base it on whether we're going to loop through source or sink pads
712           if (element->numsinkpads == 0)
713             wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
714           else
715             wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
716         }
717       }
718
719       // walk through the all the pads for this element, setting proxy functions
720       // the selection of proxy functions depends on whether we're in a cothread or not
721       pads = gst_element_get_pad_list (element);
722       while (pads) {
723         pad = GST_PAD (pads->data);
724         pads = g_list_next (pads);
725
726         // check to see if someone else gets to set up the element
727         peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
728         if (peer_manager != GST_ELEMENT(bin)) {
729           GST_DEBUG (0,"WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
730         }
731
732         // if the wrapper_function is set, we need to use the proxy functions
733         if (wrapper_function != NULL) {
734           // set up proxy functions
735           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
736             GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
737             pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
738           } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
739             GST_DEBUG (0,"setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
740             GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
741             GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
742           }
743         } else {
744           // otherwise we need to set up for 'traditional' chaining
745           if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
746             // we can just copy the chain function, since it shares the prototype
747             GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",
748                   GST_DEBUG_PAD_NAME(pad));
749             pad->pushfunc = pad->chainfunc;
750           } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
751             // we can just copy the get function, since it shares the prototype
752             GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",
753                   GST_DEBUG_PAD_NAME(pad));
754             pad->pullfunc = pad->getfunc;
755           }
756         }
757       }
758
759       // if a loopfunc has been specified, create and set up a cothread
760       if (wrapper_function != NULL) {
761         if (element->threadstate == NULL) {
762           element->threadstate = cothread_create (bin->threadcontext);
763           GST_DEBUG (0,"created cothread %p (@%p) for \"%s\"\n",element->threadstate,
764                 &element->threadstate,GST_ELEMENT_NAME (element));
765         }
766         cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
767         GST_DEBUG (0,"set wrapper function for \"%s\" to &%s\n",GST_ELEMENT_NAME (element),
768               GST_DEBUG_FUNCPTR_NAME(wrapper_function));
769       }
770
771 //      // HACK: if the element isn't decoupled, it's an entry
772 //      if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
773 //        bin->entries = g_list_append(bin->entries, element);
774     }
775
776   // otherwise, cothreads are not needed
777   } else {
778     GST_DEBUG (0,"bin is chained, no cothreads needed\n");
779
780     elements = bin->managed_elements;
781     while (elements) {
782       element = GST_ELEMENT (elements->data);
783       elements = g_list_next (elements);
784
785       pads = gst_element_get_pad_list (element);
786       while (pads) {
787         pad = GST_PAD (pads->data);
788         pads = g_list_next (pads);
789
790         if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
791           GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
792           pad->pushfunc = pad->chainfunc;
793         } else {
794           GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
795           pad->pullfunc = pad->getfunc;
796         }
797       }
798     }
799   }
800 */
801
802 static void 
803 gst_schedule_lock_element (GstSchedule *sched,GstElement *element)
804 {
805   if (element->threadstate)
806     cothread_lock(element->threadstate);
807 }
808
809 static void
810 gst_schedule_unlock_element (GstSchedule *sched,GstElement *element)
811 {
812   if (element->threadstate)
813     cothread_unlock(element->threadstate);
814 }
815
816
817 /*************** INCREMENTAL SCHEDULING CODE STARTS HERE ***************/
818
819
820 static void     gst_schedule_class_init (GstScheduleClass *klass);
821 static void     gst_schedule_init       (GstSchedule *schedule);
822
823 static GstObjectClass *parent_class = NULL;
824
825 GType gst_schedule_get_type(void) {
826   static GType schedule_type = 0;
827
828   if (!schedule_type) {
829     static const GTypeInfo schedule_info = {
830       sizeof(GstScheduleClass),
831       NULL,
832       NULL,
833       (GClassInitFunc)gst_schedule_class_init,
834       NULL,
835       NULL,
836       sizeof(GstSchedule),
837       0,
838       (GInstanceInitFunc)gst_schedule_init,
839     };
840     schedule_type = g_type_register_static(GST_TYPE_OBJECT, "GstSchedule", &schedule_info, 0);
841   }
842   return schedule_type;
843 }
844
845 static void
846 gst_schedule_class_init (GstScheduleClass *klass)
847 {
848   parent_class = g_type_class_ref(GST_TYPE_OBJECT);
849 }
850
851 static void
852 gst_schedule_init (GstSchedule *schedule)
853 {
854   schedule->add_element = GST_DEBUG_FUNCPTR(gst_schedule_add_element);
855   schedule->remove_element = GST_DEBUG_FUNCPTR(gst_schedule_remove_element);
856   schedule->enable_element = GST_DEBUG_FUNCPTR(gst_schedule_enable_element);
857   schedule->disable_element = GST_DEBUG_FUNCPTR(gst_schedule_disable_element);
858   schedule->lock_element = GST_DEBUG_FUNCPTR(gst_schedule_lock_element);
859   schedule->unlock_element = GST_DEBUG_FUNCPTR(gst_schedule_unlock_element);
860   schedule->pad_connect = GST_DEBUG_FUNCPTR(gst_schedule_pad_connect);
861   schedule->pad_disconnect = GST_DEBUG_FUNCPTR(gst_schedule_pad_disconnect);
862   schedule->pad_select = GST_DEBUG_FUNCPTR(gst_schedule_pad_select);
863   schedule->iterate = GST_DEBUG_FUNCPTR(gst_schedule_iterate);
864 }
865
866 GstSchedule*
867 gst_schedule_new(GstElement *parent)
868 {
869   GstSchedule *sched = GST_SCHEDULE (g_object_new(GST_TYPE_SCHEDULE,NULL));
870
871   sched->parent = parent;
872
873   return sched;
874 }
875
876
877 /* this function will look at a pad and determine if the peer parent is
878  * a possible candidate for connecting up in the same chain. */
879 /* DEPRACATED !!!!
880 GstElement *gst_schedule_check_pad (GstSchedule *sched, GstPad *pad) {
881   GstRealPad *peer;
882   GstElement *element, *peerelement;
883
884   GST_INFO (GST_CAT_SCHEDULING, "checking pad %s:%s for peer in scheduler",
885             GST_DEBUG_PAD_NAME(pad));
886
887   element = GST_ELEMENT(GST_PAD_PARENT(peer));
888   GST_DEBUG(GST_CAT_SCHEDULING, "element is \"%s\"\n",GST_ELEMENT_NAME(element));
889
890   peer = GST_PAD_PEER (pad);
891   if (peer == NULL) return NULL;
892   peerelement = GST_ELEMENT(GST_PAD_PARENT (peer));
893   if (peerelement == NULL) return NULL;
894   GST_DEBUG(GST_CAT_SCHEDULING, "peer element is \"%s\"\n",GST_ELEMENT_NAME(peerelement));
895
896   // now check to see if it's in the same schedule
897   if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
898     GST_DEBUG(GST_CAT_SCHEDULING, "peer is in same schedule\n");
899     return peerelement;
900   }
901
902   // otherwise it's not a candidate
903   return NULL;
904 }
905 */
906
907 GstScheduleChain *
908 gst_schedule_chain_new (GstSchedule *sched)
909 {
910   GstScheduleChain *chain = g_new (GstScheduleChain, 1);
911
912   // initialize the chain with sane values
913   chain->sched = sched;
914   chain->disabled = NULL;
915   chain->elements = NULL;
916   chain->num_elements = 0;
917   chain->entry = NULL;
918   chain->cothreaded_elements = 0;
919   chain->schedule = FALSE;
920
921   // add the chain to the schedules' list of chains
922   sched->chains = g_list_prepend (sched->chains, chain);
923   sched->num_chains++;
924
925   GST_INFO (GST_CAT_SCHEDULING, "created new chain %p, now are %d chains in sched %p",
926             chain,sched->num_chains,sched);
927
928   return chain;
929 }
930
931 void
932 gst_schedule_chain_destroy (GstScheduleChain *chain)
933 {
934   GstSchedule *sched = chain->sched;
935
936   // remove the chain from the schedules' list of chains
937   chain->sched->chains = g_list_remove (chain->sched->chains, chain);
938   chain->sched->num_chains--;
939
940   // destroy the chain
941   g_list_free (chain->disabled);        // should be empty...
942   g_list_free (chain->elements);        // ditto
943   g_free (chain);
944
945   GST_INFO (GST_CAT_SCHEDULING, "destroyed chain %p, now are %d chains in sched %p",chain,sched->num_chains,sched);
946 }
947
948 void
949 gst_schedule_chain_add_element (GstScheduleChain *chain, GstElement *element)
950 {
951   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to chain %p", GST_ELEMENT_NAME (element),chain);
952
953   // set the sched pointer for the element
954   element->sched = chain->sched;
955
956   // add the element to the list of 'disabled' elements
957   chain->disabled = g_list_prepend (chain->disabled, element);
958   chain->num_elements++;
959 }
960
961 void
962 gst_schedule_chain_enable_element (GstScheduleChain *chain, GstElement *element)
963 {
964   GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
965
966   // remove from disabled list
967   chain->disabled = g_list_remove (chain->disabled, element);
968
969   // add to elements list
970   chain->elements = g_list_prepend (chain->elements, element);
971
972   // reschedule the chain
973   gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
974 }
975
976 void
977 gst_schedule_chain_disable_element (GstScheduleChain *chain, GstElement *element)
978 {
979   GST_INFO (GST_CAT_SCHEDULING, "disabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
980
981   // remove from elements list
982   chain->elements = g_list_remove (chain->elements, element);
983
984   // add to disabled list
985   chain->disabled = g_list_prepend (chain->disabled, element);
986
987   // reschedule the chain
988 // FIXME this should be done only if manager state != NULL
989 //  gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
990 }
991
992 void
993 gst_schedule_chain_remove_element (GstScheduleChain *chain, GstElement *element)
994 {
995   GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),chain);
996
997   // if it's active, deactivate it
998   if (g_list_find (chain->elements, element)) {
999     gst_schedule_chain_disable_element (chain, element);
1000   }
1001
1002   // remove the element from the list of elements
1003   chain->disabled = g_list_remove (chain->disabled, element);
1004   chain->num_elements--;
1005
1006   // if there are no more elements in the chain, destroy the chain
1007   if (chain->num_elements == 0)
1008     gst_schedule_chain_destroy(chain);
1009
1010   // unset the sched pointer for the element
1011   element->sched = NULL;
1012 }
1013
1014 void
1015 gst_schedule_chain_elements (GstSchedule *sched, GstElement *element1, GstElement *element2)
1016 {
1017   GList *chains;
1018   GstScheduleChain *chain;
1019   GstScheduleChain *chain1 = NULL, *chain2 = NULL;
1020   GstElement *element;
1021
1022   // first find the chains that hold the two 
1023   chains = sched->chains;
1024   while (chains) {
1025     chain = (GstScheduleChain *)(chains->data);
1026     chains = g_list_next(chains);
1027
1028     if (g_list_find (chain->disabled,element1))
1029       chain1 = chain;
1030     else if (g_list_find (chain->elements,element1))
1031       chain1 = chain;
1032
1033     if (g_list_find (chain->disabled,element2))
1034       chain2 = chain;
1035     else if (g_list_find (chain->elements,element2))
1036       chain2 = chain;
1037   }
1038
1039   // first check to see if they're in the same chain, we're done if that's the case
1040   if ((chain1 != NULL) && (chain1 == chain2)) {
1041     GST_INFO (GST_CAT_SCHEDULING, "elements are already in the same chain");
1042     return;
1043   }
1044
1045   // now, if neither element has a chain, create one
1046   if ((chain1 == NULL) && (chain2 == NULL)) {
1047     GST_INFO (GST_CAT_SCHEDULING, "creating new chain to hold two new elements");
1048     chain = gst_schedule_chain_new (sched);
1049     gst_schedule_chain_add_element (chain, element1);
1050     gst_schedule_chain_add_element (chain, element2);
1051     // FIXME chain changed here
1052 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1053
1054   // otherwise if both have chains already, join them
1055   } else if ((chain1 != NULL) && (chain2 != NULL)) {
1056     GST_INFO (GST_CAT_SCHEDULING, "merging chain %p into chain %p",chain2,chain1);
1057     // take the contents of chain2 and merge them into chain1
1058     chain1->disabled = g_list_concat (chain1->disabled, g_list_copy(chain2->disabled));
1059     chain1->elements = g_list_concat (chain1->elements, g_list_copy(chain2->elements));
1060     chain1->num_elements += chain2->num_elements;
1061     // FIXME chain changed here
1062 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1063
1064     gst_schedule_chain_destroy(chain2);
1065
1066   // otherwise one has a chain already, the other doesn't
1067   } else {
1068     // pick out which one has the chain, and which doesn't
1069     if (chain1 != NULL) chain = chain1, element = element2;
1070     else chain = chain2, element = element1;
1071
1072     GST_INFO (GST_CAT_SCHEDULING, "adding element to existing chain");
1073     gst_schedule_chain_add_element (chain, element);
1074     // FIXME chain changed here
1075 //    gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1076   }
1077 }
1078
1079 void
1080 gst_schedule_pad_connect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1081 {
1082   GstElement *srcelement,*sinkelement;
1083
1084   srcelement = GST_PAD_PARENT(srcpad);
1085   g_return_if_fail(srcelement != NULL);
1086   sinkelement = GST_PAD_PARENT(sinkpad);
1087   g_return_if_fail(sinkelement != NULL);
1088
1089   GST_INFO (GST_CAT_SCHEDULING, "have pad connected callback on %s:%s to %s:%s",GST_DEBUG_PAD_NAME(srcpad),GST_DEBUG_PAD_NAME(sinkpad));
1090   GST_DEBUG(GST_CAT_SCHEDULING, "srcpad sched is %p, sinkpad sched is %p\n",
1091 GST_ELEMENT_SCHED(srcelement),GST_ELEMENT_SCHED(sinkelement));
1092
1093   if (GST_ELEMENT_SCHED(srcelement) == GST_ELEMENT_SCHED(sinkelement)) {
1094     GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same schedule, chaining together",GST_DEBUG_PAD_NAME(sinkpad));
1095     gst_schedule_chain_elements (sched, srcelement, sinkelement);
1096   }
1097 }
1098
1099 // find the chain within the schedule that holds the element, if any
1100 GstScheduleChain *
1101 gst_schedule_find_chain (GstSchedule *sched, GstElement *element)
1102 {
1103   GList *chains;
1104   GstScheduleChain *chain;
1105
1106   GST_INFO (GST_CAT_SCHEDULING, "searching for element \"%s\" in chains",GST_ELEMENT_NAME(element));
1107
1108   chains = sched->chains;
1109   while (chains) {
1110     chain = (GstScheduleChain *)(chains->data);
1111     chains = g_list_next (chains);
1112
1113     if (g_list_find (chain->elements, element))
1114       return chain;
1115     if (g_list_find (chain->disabled, element))
1116       return chain;
1117   }
1118
1119   return NULL;
1120 }
1121
1122 void
1123 gst_schedule_chain_recursive_add (GstScheduleChain *chain, GstElement *element)
1124 {
1125   GList *pads;
1126   GstPad *pad;
1127   GstElement *peerelement;
1128
1129   // add the element to the chain
1130   gst_schedule_chain_add_element (chain, element);
1131
1132   GST_DEBUG(GST_CAT_SCHEDULING, "recursing on element \"%s\"\n",GST_ELEMENT_NAME(element));
1133   // now go through all the pads and see which peers can be added
1134   pads = element->pads;
1135   while (pads) {
1136     pad = GST_PAD(pads->data);
1137     pads = g_list_next (pads);
1138
1139     GST_DEBUG(GST_CAT_SCHEDULING, "have pad %s:%s, checking for valid peer\n",GST_DEBUG_PAD_NAME(pad));
1140     // if the peer exists and could be in the same chain
1141     if (GST_PAD_PEER(pad)) {
1142       GST_DEBUG(GST_CAT_SCHEDULING, "has peer %s:%s\n",GST_DEBUG_PAD_NAME(GST_PAD_PEER(pad)));
1143       peerelement = GST_PAD_PARENT(GST_PAD_PEER(pad));
1144       if (GST_ELEMENT_SCHED(GST_PAD_PARENT(pad)) == GST_ELEMENT_SCHED(peerelement)) {
1145         GST_DEBUG(GST_CAT_SCHEDULING, "peer \"%s\" is valid for same chain\n",GST_ELEMENT_NAME(peerelement));
1146         // if it's not already in a chain, add it to this one
1147         if (gst_schedule_find_chain (chain->sched, peerelement) == NULL) {
1148           gst_schedule_chain_recursive_add (chain, peerelement);
1149         }
1150       }
1151     }
1152   }
1153 }
1154
1155 void
1156 gst_schedule_pad_disconnect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1157 {
1158   GstScheduleChain *chain;
1159   GstElement *element1, *element2;
1160   GstScheduleChain *chain1, *chain2;
1161
1162   GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
1163             GST_DEBUG_PAD_NAME(srcpad), GST_DEBUG_PAD_NAME(sinkpad));
1164
1165   // we need to have the parent elements of each pad
1166   element1 = GST_ELEMENT(GST_PAD_PARENT(srcpad));
1167   element2 = GST_ELEMENT(GST_PAD_PARENT(sinkpad));
1168
1169   // first task is to remove the old chain they belonged to.
1170   // this can be accomplished by taking either of the elements,
1171   // since they are guaranteed to be in the same chain
1172   // FIXME is it potentially better to make an attempt at splitting cleaner??
1173   chain = gst_schedule_find_chain (sched, element1);
1174   if (chain) {
1175     GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1176     gst_schedule_chain_destroy (chain);
1177   }
1178
1179   // now create a new chain to hold element1 and build it from scratch
1180   chain1 = gst_schedule_chain_new (sched);
1181   gst_schedule_chain_recursive_add (chain1, element1);
1182
1183   // check the other element to see if it landed in the newly created chain
1184   if (gst_schedule_find_chain (sched, element2) == NULL) {
1185     // if not in chain, create chain and build from scratch
1186     chain2 = gst_schedule_chain_new (sched);
1187     gst_schedule_chain_recursive_add (chain2, element2);
1188   }
1189 }
1190
1191 GstPad*
1192 gst_schedule_pad_select (GstSchedule *sched, GList *padlist)
1193 {
1194   GstPad *pad = NULL;
1195   GST_INFO (GST_CAT_SCHEDULING, "performing select");
1196
1197   while (padlist) {
1198     pad = GST_PAD (padlist->data);
1199
1200     GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_select_proxy);
1201
1202     padlist = g_list_next (padlist);
1203   }
1204   if (pad != NULL) {
1205     GstRealPad *peer = GST_RPAD_PEER(pad);
1206     
1207     cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
1208
1209     g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
1210     pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
1211
1212     g_assert (pad != NULL);
1213     g_print ("back from select (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
1214   }
1215   return pad;
1216 }
1217
1218 void
1219 gst_schedule_add_element (GstSchedule *sched, GstElement *element)
1220 {
1221   GList *pads;
1222   GstPad *pad;
1223   GstElement *peerelement;
1224   GstScheduleChain *chain;
1225
1226   g_return_if_fail (element != NULL);
1227   g_return_if_fail (GST_IS_ELEMENT(element));
1228
1229   // if it's already in this schedule, don't bother doing anything
1230   if (GST_ELEMENT_SCHED(element) == sched) return;
1231
1232   GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to schedule",
1233     GST_ELEMENT_NAME(element));
1234
1235   // if the element already has a different scheduler, remove the element from it
1236   if (GST_ELEMENT_SCHED(element)) {
1237     gst_schedule_remove_element(GST_ELEMENT_SCHED(element),element);
1238   }
1239
1240   // set the sched pointer in the element itself
1241   GST_ELEMENT_SCHED(element) = sched;
1242
1243   // only deal with elements after this point, not bins
1244   // exception is made for Bin's that are schedulable, like the autoplugger
1245   if (GST_IS_BIN (element) && !GST_FLAG_IS_SET(element, GST_BIN_SELF_SCHEDULABLE)) return;
1246
1247   // first add it to the list of elements that are to be scheduled
1248   sched->elements = g_list_prepend (sched->elements, element);
1249   sched->num_elements++;
1250
1251   // create a chain to hold it, and add
1252   chain = gst_schedule_chain_new (sched);
1253   gst_schedule_chain_add_element (chain, element);
1254
1255   // set the sched pointer in all the pads
1256   pads = element->pads;
1257   while (pads) {
1258     pad = GST_PAD(pads->data);
1259     pads = g_list_next(pads);
1260
1261     // we only operate on real pads
1262     if (!GST_IS_REAL_PAD(pad)) continue;
1263
1264     // set the pad's sched pointer
1265     gst_pad_set_sched (pad, sched);
1266
1267     // if the peer element exists and is a candidate
1268     if (GST_PAD_PEER(pad)) {
1269       peerelement = GST_PAD_PARENT( GST_PAD_PEER (pad) );
1270       if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
1271         GST_INFO (GST_CAT_SCHEDULING, "peer is in same schedule, chaining together");
1272         // make sure that the two elements are in the same chain
1273         gst_schedule_chain_elements (sched,element,peerelement);
1274       }
1275     }
1276   }
1277 }
1278
1279 void
1280 gst_schedule_enable_element (GstSchedule *sched, GstElement *element)
1281 {
1282   GstScheduleChain *chain;
1283
1284   // find the chain the element's in
1285   chain = gst_schedule_find_chain (sched, element);
1286
1287   if (chain)
1288     gst_schedule_chain_enable_element (chain, element);
1289   else
1290     GST_INFO (GST_CAT_SCHEDULING, "element not found in any chain, not enabling");
1291 }
1292
1293 void
1294 gst_schedule_disable_element (GstSchedule *sched, GstElement *element)
1295 {
1296   GstScheduleChain *chain;
1297
1298   // find the chain the element is in
1299   chain = gst_schedule_find_chain (sched, element);
1300
1301   // remove it from the chain
1302   if (chain) {
1303     gst_schedule_chain_disable_element(chain,element);
1304   }
1305 }
1306
1307 void
1308 gst_schedule_remove_element (GstSchedule *sched, GstElement *element)
1309 {
1310   GstScheduleChain *chain;
1311
1312   g_return_if_fail (element != NULL);
1313   g_return_if_fail (GST_IS_ELEMENT(element));
1314
1315   if (g_list_find (sched->elements, element)) {
1316     GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from schedule",
1317       GST_ELEMENT_NAME(element));
1318
1319     // find what chain the element is in
1320     chain = gst_schedule_find_chain(sched, element);
1321
1322     // remove it from its chain
1323     gst_schedule_chain_remove_element (chain, element);
1324
1325     // remove it from the list of elements
1326     sched->elements = g_list_remove (sched->elements, element);
1327     sched->num_elements--;
1328
1329     // unset the scheduler pointer in the element
1330     GST_ELEMENT_SCHED(element) = NULL;
1331   }
1332 }
1333
1334 gboolean
1335 gst_schedule_iterate (GstSchedule *sched)
1336 {
1337   GstBin *bin = GST_BIN(sched->parent);
1338   GList *chains;
1339   GstScheduleChain *chain;
1340   GstElement *entry;
1341   gint num_scheduled = 0;
1342   gboolean eos = FALSE;
1343   GList *elements;
1344
1345   GST_DEBUG_ENTER("(\"%s\")", GST_ELEMENT_NAME (bin));
1346
1347   g_return_val_if_fail (bin != NULL, TRUE);
1348   g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
1349 //  g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
1350
1351   // step through all the chains
1352   chains = sched->chains;
1353 //  if (chains == NULL) return FALSE;
1354 g_return_val_if_fail (chains != NULL, FALSE);
1355   while (chains) {
1356     chain = (GstScheduleChain *)(chains->data);
1357     chains = g_list_next (chains);
1358
1359 //    if (!chain->need_scheduling) continue;
1360
1361 //    if (chain->need_cothreads) {
1362       // all we really have to do is switch to the first child
1363       // FIXME this should be lots more intelligent about where to start
1364       GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via cothreads\n");
1365
1366       if (chain->elements) {
1367         entry = NULL; //MattH ADDED?
1368 GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_elements);
1369         elements = chain->elements;
1370         while (elements) {
1371           entry = GST_ELEMENT(elements->data);
1372           elements = g_list_next(elements);
1373           if (GST_FLAG_IS_SET(entry,GST_ELEMENT_DECOUPLED)) {
1374             GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is DECOUPLED, skipping\n",GST_ELEMENT_NAME(entry));
1375             entry = NULL;
1376           } else if (GST_FLAG_IS_SET(entry,GST_ELEMENT_NO_ENTRY)) {
1377             GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is not valid, skipping\n",GST_ELEMENT_NAME(entry));
1378             entry = NULL;
1379           } else
1380             break;
1381         }
1382         if (entry) {
1383           GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1384           GST_DEBUG (GST_CAT_DATAFLOW,"set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
1385                GST_ELEMENT_NAME (entry),entry);
1386           cothread_switch (entry->threadstate);
1387
1388           // following is a check to see if the chain was interrupted due to a
1389           // top-half state_change().  (i.e., if there's a pending state.)
1390           //
1391           // if it was, return to gstthread.c::gst_thread_main_loop() to
1392           // execute the state change.
1393           GST_DEBUG (GST_CAT_DATAFLOW,"cothread switch ended or interrupted\n");
1394           if (GST_STATE_PENDING(GST_SCHEDULE(sched)->parent) != GST_STATE_VOID_PENDING)
1395           {
1396             GST_DEBUG (GST_CAT_DATAFLOW,"handle pending state %d\n",
1397                        GST_STATE_PENDING(GST_SCHEDULE(sched)->parent));
1398             return 0;
1399           }
1400
1401         } else {
1402           GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
1403           eos = TRUE;
1404         }
1405       } else {
1406         GST_INFO (GST_CAT_DATAFLOW,"NO ENABLED ELEMENTS IN CHAIN!!");
1407         eos = TRUE;
1408       }
1409
1410 /*                
1411     } else {
1412       GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via chain-functions\n");
1413
1414       entries = chain->entries;
1415          
1416       g_assert (entries != NULL);
1417      
1418       while (entries) {
1419         entry = GST_ELEMENT (entries->data);
1420         entries = g_list_next (entries);
1421  
1422         GST_DEBUG (GST_CAT_DATAFLOW,"have entry \"%s\"\n",GST_ELEMENT_NAME (entry));
1423   
1424         if (GST_IS_BIN (entry)) {
1425           gst_bin_iterate (GST_BIN (entry));
1426         } else {
1427           pads = entry->pads;
1428           while (pads) {
1429             pad = GST_PAD (pads->data);
1430             if (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC) {
1431               GST_DEBUG (GST_CAT_DATAFLOW,"calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
1432               if (GST_REAL_PAD(pad)->getfunc == NULL) 
1433                 fprintf(stderr, "error, no getfunc in \"%s\"\n", GST_ELEMENT_NAME  (entry));
1434               else
1435                 buf = (GST_REAL_PAD(pad)->getfunc)(pad);
1436               if (buf) gst_pad_push(pad,buf);
1437             }
1438             pads = g_list_next (pads);
1439           }
1440         }
1441       }
1442     }*/
1443     num_scheduled++;
1444   }
1445
1446 /*
1447   // check if nothing was scheduled that was ours..
1448   if (!num_scheduled) {
1449     // are there any other elements that are still busy?
1450     if (bin->num_eos_providers) {
1451       GST_LOCK (bin);
1452       GST_DEBUG (GST_CATA_DATAFLOW,"waiting for eos providers\n");
1453       g_cond_wait (bin->eoscond, GST_OBJECT(bin)->lock);  
1454       GST_DEBUG (GST_CAT_DATAFLOW,"num eos providers %d\n", bin->num_eos_providers);
1455       GST_UNLOCK (bin);
1456     }
1457     else {      
1458       gst_element_signal_eos (GST_ELEMENT (bin));
1459       eos = TRUE;
1460     }       
1461   }
1462 */
1463
1464   GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
1465   return !eos;
1466 }
1467
1468
1469
1470 void
1471 gst_schedule_show (GstSchedule *sched)
1472 {
1473   GList *chains, *elements;
1474   GstElement *element;
1475   GstScheduleChain *chain;
1476
1477   if (sched == NULL) {
1478     g_print("schedule doesn't exist for this element\n");
1479     return;
1480   }
1481
1482   g_return_if_fail(GST_IS_SCHEDULE(sched));
1483
1484   g_print("SCHEDULE DUMP FOR MANAGING BIN \"%s\"\n",GST_ELEMENT_NAME(sched->parent));
1485
1486   g_print("schedule has %d elements in it: ",sched->num_elements);
1487   elements = sched->elements;
1488   while (elements) {
1489     element = GST_ELEMENT(elements->data);
1490     elements = g_list_next(elements);
1491
1492     g_print("%s, ",GST_ELEMENT_NAME(element));
1493   }
1494   g_print("\n");
1495
1496   g_print("schedule has %d chains in it\n",sched->num_chains);
1497   chains = sched->chains;
1498   while (chains) {
1499     chain = (GstScheduleChain *)(chains->data);
1500     chains = g_list_next(chains);
1501
1502     g_print("%p: ",chain);
1503
1504     elements = chain->disabled;
1505     while (elements) {
1506       element = GST_ELEMENT(elements->data);
1507       elements = g_list_next(elements);
1508
1509       g_print("!%s, ",GST_ELEMENT_NAME(element));
1510     }
1511
1512     elements = chain->elements;
1513     while (elements) {
1514       element = GST_ELEMENT(elements->data);
1515       elements = g_list_next(elements);
1516
1517       g_print("%s, ",GST_ELEMENT_NAME(element));
1518     }
1519     g_print("\n");
1520   }
1521 }