2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstscheduler.c: Default scheduling code for most cases
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
23 //#define GST_DEBUG_ENABLED
24 #include "gst_private.h"
26 #include "gstscheduler.h"
30 gst_schedule_loopfunc_wrapper (int argc,char *argv[])
32 GstElement *element = GST_ELEMENT (argv);
33 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
35 GST_DEBUG_ENTER("(%d,'%s')",argc,name);
38 GST_DEBUG (GST_CAT_DATAFLOW,"calling loopfunc %s for element %s\n",
39 GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
40 (element->loopfunc) (element);
41 GST_DEBUG (GST_CAT_DATAFLOW,"element %s ended loop function\n", name);
42 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
43 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
45 GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
50 gst_schedule_chain_wrapper (int argc,char *argv[])
52 GstElement *element = GST_ELEMENT (argv);
53 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
59 GST_DEBUG_ENTER("(\"%s\")",name);
61 GST_DEBUG (GST_CAT_DATAFLOW,"stepping through pads\n");
65 pad = GST_PAD (pads->data);
66 pads = g_list_next (pads);
67 if (!GST_IS_REAL_PAD(pad)) continue;
68 realpad = GST_REAL_PAD(pad);
69 if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SINK) {
70 GST_DEBUG (GST_CAT_DATAFLOW,"pulling a buffer from %s:%s\n", name, GST_PAD_NAME (pad));
71 buf = gst_pad_pull (pad);
72 GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s\n", name, GST_PAD_NAME (pad));
73 if (buf) GST_RPAD_CHAINFUNC(realpad) (pad,buf);
74 GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s done\n", name, GST_PAD_NAME (pad));
77 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
78 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
80 GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
85 gst_schedule_src_wrapper (int argc,char *argv[])
87 GstElement *element = GST_ELEMENT (argv);
90 GstBuffer *buf = NULL;
91 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
93 GST_DEBUG_ENTER("(%d,\"%s\")",argc,name);
98 if (!GST_IS_REAL_PAD(pads->data)) continue;
99 realpad = (GstRealPad*)(pads->data);
100 pads = g_list_next(pads);
101 if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SRC) {
102 GST_DEBUG (GST_CAT_DATAFLOW,"calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
103 if (realpad->regiontype != GST_REGION_VOID) {
104 g_return_val_if_fail (GST_RPAD_GETREGIONFUNC(realpad) != NULL, 0);
105 // if (GST_RPAD_GETREGIONFUNC(realpad) == NULL)
106 // fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
108 buf = (GST_RPAD_GETREGIONFUNC(realpad))((GstPad*)realpad,realpad->regiontype,realpad->offset,realpad->len);
109 realpad->regiontype = GST_REGION_VOID;
111 g_return_val_if_fail (GST_RPAD_GETFUNC(realpad) != NULL, 0);
112 // if (GST_RPAD_GETFUNC(realpad) == NULL)
113 // fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
115 buf = GST_RPAD_GETFUNC(realpad) ((GstPad*)realpad);
118 GST_DEBUG (GST_CAT_DATAFLOW,"calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
119 gst_pad_push ((GstPad*)realpad, buf);
122 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
123 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
130 gst_schedule_pushfunc_proxy (GstPad *pad, GstBuffer *buf)
132 GstRealPad *peer = GST_RPAD_PEER(pad);
134 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
135 GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
137 // FIXME this should be bounded
138 // loop until the bufferpen is empty so we can fill it up again
139 while (GST_RPAD_BUFPEN(pad) != NULL) {
140 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
141 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
142 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
144 // we may no longer be the same pad, check.
145 if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
146 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
147 pad = (GstPad *)GST_RPAD_PEER(peer);
151 g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
152 // now fill the bufferpen and switch so it can be consumed
153 GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
154 GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
155 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
157 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
161 gst_schedule_select_proxy (GstPad *pad, GstBuffer *buf)
163 GstRealPad *peer = GST_RPAD_PEER(pad);
165 g_print ("select proxy (%s:%s)\n",GST_DEBUG_PAD_NAME(pad));
167 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
168 GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
170 g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
171 // now fill the bufferpen and switch so it can be consumed
172 GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
173 GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
174 g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
175 GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
176 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
178 g_print ("done switching\n");
179 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
184 gst_schedule_pullfunc_proxy (GstPad *pad)
187 GstRealPad *peer = GST_RPAD_PEER(pad);
189 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
191 // FIXME this should be bounded
192 // we will loop switching to the peer until it's filled up the bufferpen
193 while (GST_RPAD_BUFPEN(pad) == NULL) {
194 GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
195 GST_ELEMENT_NAME(GST_ELEMENT(GST_PAD_PARENT(pad))),
196 GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
197 cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
199 // we may no longer be the same pad, check.
200 if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
201 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
202 pad = (GstPad *)GST_RPAD_PEER(peer);
205 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
207 // now grab the buffer from the pen, clear the pen, and return the buffer
208 buf = GST_RPAD_BUFPEN(pad);
209 GST_RPAD_BUFPEN(pad) = NULL;
214 gst_schedule_pullregionfunc_proxy (GstPad *pad,GstRegionType type,guint64 offset,guint64 len)
217 GstRealPad *peer = GST_RPAD_PEER(pad);
219 GST_DEBUG_ENTER("%s:%s,%d,%lld,%lld",GST_DEBUG_PAD_NAME(pad),type,offset,len);
221 // put the region info into the pad
222 GST_RPAD_REGIONTYPE(pad) = type;
223 GST_RPAD_OFFSET(pad) = offset;
224 GST_RPAD_LEN(pad) = len;
226 // FIXME this should be bounded
227 // we will loop switching to the peer until it's filled up the bufferpen
228 while (GST_RPAD_BUFPEN(pad) == NULL) {
229 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
230 GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
231 cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
233 // we may no longer be the same pad, check.
234 if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
235 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
236 pad = (GstPad *)GST_RPAD_PEER(peer);
239 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
241 // now grab the buffer from the pen, clear the pen, and return the buffer
242 buf = GST_RPAD_BUFPEN(pad);
243 GST_RPAD_BUFPEN(pad) = NULL;
249 gst_schedule_cothreaded_chain (GstBin *bin, GstScheduleChain *chain) {
252 cothread_func wrapper_function;
256 GST_DEBUG (GST_CAT_SCHEDULING,"chain is using COTHREADS\n");
258 // first create thread context
259 if (bin->threadcontext == NULL) {
260 GST_DEBUG (GST_CAT_SCHEDULING,"initializing cothread context\n");
261 bin->threadcontext = cothread_init ();
264 // walk through all the chain's elements
265 elements = chain->elements;
267 element = GST_ELEMENT (elements->data);
268 elements = g_list_next (elements);
270 // start out without a wrapper function, we select it later
271 wrapper_function = NULL;
273 // if the element has a loopfunc...
274 if (element->loopfunc != NULL) {
275 wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_loopfunc_wrapper);
276 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a loop-based\n",GST_ELEMENT_NAME(element));
278 // otherwise we need to decide what kind of cothread
279 // if it's not DECOUPLED, we decide based on whether it's a source or not
280 if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
281 // if it doesn't have any sinks, it must be a source (duh)
282 if (element->numsinkpads == 0) {
283 wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_src_wrapper);
284 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a source, using _src_wrapper\n",GST_ELEMENT_NAME(element));
286 wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_chain_wrapper);
287 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a filter, using _chain_wrapper\n",GST_ELEMENT_NAME(element));
292 // now we have to walk through the pads to set up their state
293 pads = gst_element_get_pad_list (element);
295 pad = GST_PAD (pads->data);
296 pads = g_list_next (pads);
297 if (!GST_IS_REAL_PAD(pad)) continue;
299 // if the element is DECOUPLED or outside the manager, we have to chain
300 if ((wrapper_function == NULL) ||
301 (GST_RPAD_PEER(pad) &&
302 (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
304 // set the chain proxies
305 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
306 GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
307 GST_RPAD_PUSHFUNC(pad) = GST_RPAD_CHAINFUNC(pad);
309 GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
310 GST_RPAD_PULLFUNC(pad) = GST_RPAD_GETFUNC(pad);
311 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
314 // otherwise we really are a cothread
316 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
317 GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
318 GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pushfunc_proxy);
320 GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
321 GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullfunc_proxy);
322 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullregionfunc_proxy);
327 // need to set up the cothread now
328 if (wrapper_function != NULL) {
329 if (element->threadstate == NULL) {
330 // FIXME handle cothread_create returning NULL
331 element->threadstate = cothread_create (bin->threadcontext);
332 GST_DEBUG (GST_CAT_SCHEDULING,"created cothread %p for '%s'\n",element->threadstate,GST_ELEMENT_NAME(element));
334 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
335 GST_DEBUG (GST_CAT_SCHEDULING,"set wrapper function for '%s' to &%s\n",GST_ELEMENT_NAME(element),
336 GST_DEBUG_FUNCPTR_NAME(wrapper_function));
341 G_GNUC_UNUSED static void
342 gst_schedule_chained_chain (GstBin *bin, _GstBinChain *chain) {
348 GST_DEBUG (GST_CAT_SCHEDULING,"chain entered\n");
349 // walk through all the elements
350 elements = chain->elements;
352 element = GST_ELEMENT (elements->data);
353 elements = g_list_next (elements);
355 // walk through all the pads
356 pads = gst_element_get_pad_list (element);
358 pad = GST_PAD (pads->data);
359 pads = g_list_next (pads);
360 if (!GST_IS_REAL_PAD(pad)) continue;
362 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
363 GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
364 GST_RPAD_PUSHFUNC(pad) = GST_RPAD_CHAINFUNC(pad);
366 GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
367 GST_RPAD_PULLFUNC(pad) = GST_RPAD_GETFUNC(pad);
368 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
376 gst_bin_schedule_cleanup (GstBin *bin)
381 chains = bin->chains;
383 chain = (_GstBinChain *)(chains->data);
384 chains = g_list_next(chains);
386 // g_list_free(chain->disabled);
387 g_list_free(chain->elements);
388 g_list_free(chain->entries);
392 g_list_free(bin->chains);
398 gst_scheduler_handle_eos (GstElement *element, _GstBinChain *chain)
400 GST_DEBUG (GST_CAT_SCHEDULING,"chain removed from scheduler, EOS from element \"%s\"\n", GST_ELEMENT_NAME (element));
401 chain->need_scheduling = FALSE;
405 void gst_bin_schedule_func(GstBin *bin) {
408 GSList *pending = NULL;
411 GstElement *peerparent;
413 GstScheduleChain *chain;
415 GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME (GST_ELEMENT (bin)));
417 gst_bin_schedule_cleanup(bin);
419 // next we have to find all the separate scheduling chains
420 GST_DEBUG (GST_CAT_SCHEDULING,"attempting to find scheduling chains...\n");
421 // first make a copy of the managed_elements we can mess with
422 elements = g_list_copy (bin->managed_elements);
423 // we have to repeat until the list is empty to get all chains
425 element = GST_ELEMENT (elements->data);
427 // if this is a DECOUPLED element
428 if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
429 // skip this element entirely
430 GST_DEBUG (GST_CAT_SCHEDULING,"skipping '%s' because it's decoupled\n",GST_ELEMENT_NAME(element));
431 elements = g_list_next (elements);
435 GST_DEBUG (GST_CAT_SCHEDULING,"starting with element '%s'\n",GST_ELEMENT_NAME(element));
437 // prime the pending list with the first element off the top
438 pending = g_slist_prepend (NULL, element);
439 // and remove that one from the main list
440 elements = g_list_remove (elements, element);
442 // create a chain structure
443 chain = g_new0 (_GstBinChain, 1);
444 chain->need_scheduling = TRUE;
446 // for each pending element, walk the pipeline
448 // retrieve the top of the stack and pop it
449 element = GST_ELEMENT (pending->data);
450 pending = g_slist_remove (pending, element);
452 // add ourselves to the chain's list of elements
453 GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to chain\n",GST_ELEMENT_NAME(element));
454 chain->elements = g_list_prepend (chain->elements, element);
455 chain->num_elements++;
456 gtk_signal_connect (G_OBJECT (element), "eos", gst_scheduler_handle_eos, chain);
457 // set the cothreads flag as appropriate
458 if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
459 chain->need_cothreads = TRUE;
460 if (bin->use_cothreads == TRUE)
461 chain->need_cothreads = TRUE;
463 // if we're managed by the current bin, and we're not decoupled,
464 // go find all the peers and add them to the list of elements to check
465 if ((element->manager == GST_ELEMENT(bin)) &&
466 !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
467 // remove ourselves from the outer list of all managed elements
468 // GST_DEBUG (GST_CAT_SCHEDULING,"removing '%s' from list of possible elements\n",GST_ELEMENT_NAME(element));
469 elements = g_list_remove (elements, element);
471 // if this element is a source, add it as an entry
472 if (element->numsinkpads == 0) {
473 chain->entries = g_list_prepend (chain->entries, element);
474 GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as SRC entry into the chain\n",GST_ELEMENT_NAME(element));
477 // now we have to walk the pads to find peers
478 pads = gst_element_get_pad_list (element);
480 pad = GST_PAD (pads->data);
481 pads = g_list_next (pads);
482 if (!GST_IS_REAL_PAD(pad)) continue;
483 GST_DEBUG (GST_CAT_SCHEDULING,"have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
485 if (GST_RPAD_PEER(pad) == NULL) continue;
486 if (GST_RPAD_PEER(pad) == NULL) GST_ERROR(pad,"peer is null!");
487 g_assert(GST_RPAD_PEER(pad) != NULL);
488 g_assert(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))) != NULL);
490 peerparent = GST_ELEMENT(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))));
492 GST_DEBUG (GST_CAT_SCHEDULING,"peer pad %p\n", GST_RPAD_PEER(pad));
493 // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
494 // only add it if it's in the list of un-visited elements still
495 if ((g_list_find (elements, peerparent) != NULL) ||
496 GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED)) {
497 // add the peer element to the pending list
498 GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to list of pending elements\n",
499 GST_ELEMENT_NAME(peerparent));
500 pending = g_slist_prepend (pending, peerparent);
502 // if this is a sink pad, then the element on the other side is an entry
503 if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
504 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
505 chain->entries = g_list_prepend (chain->entries, peerparent);
506 gtk_signal_connect (G_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
507 GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as DECOUPLED entry into the chain\n",GST_ELEMENT_NAME(peerparent));
510 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' has already been dealt with\n",GST_ELEMENT_NAME(peerparent));
515 // add the chain to the bin
516 GST_DEBUG (GST_CAT_SCHEDULING,"have chain with %d elements: ",chain->num_elements);
517 { GList *elements = chain->elements;
519 element = GST_ELEMENT (elements->data);
520 elements = g_list_next(elements);
521 GST_DEBUG_NOPREFIX(GST_CAT_SCHEDULING,"%s, ",GST_ELEMENT_NAME(element));
524 GST_DEBUG_NOPREFIX(GST_CAT_DATAFLOW,"\n");
525 bin->chains = g_list_prepend (bin->chains, chain);
528 // free up the list in case it's full of DECOUPLED elements
529 g_list_free (elements);
531 GST_DEBUG (GST_CAT_SCHEDULING,"\nwe have %d chains to schedule\n",bin->num_chains);
533 // now we have to go through all the chains and schedule them
534 chains = bin->chains;
536 chain = (GstScheduleChain *)(chains->data);
537 chains = g_list_next (chains);
539 // schedule as appropriate
540 if (chain->need_cothreads) {
541 gst_schedule_cothreaded_chain (bin,chain);
543 gst_schedule_chained_chain (bin,chain);
547 GST_DEBUG_LEAVE("(\"%s\")",GST_ELEMENT_NAME(GST_ELEMENT(bin)));
553 // ***** check for possible connections outside
554 // get the pad's peer
555 peer = gst_pad_get_peer (pad);
556 // FIXME this should be an error condition, if not disabled
558 // get the parent of the peer of the pad
559 outside = GST_ELEMENT (gst_pad_get_parent (peer));
560 // FIXME this should *really* be an error condition
562 // if it's a source or connection and it's not ours...
563 if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
564 (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
565 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
566 GST_DEBUG (0,"dealing with outside source element %s\n",GST_ELEMENT_NAME(outside));
567 // GST_DEBUG (0,"PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n",
568 //GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc);
569 // pad->pullfunc = pad->peer->pullfunc;
570 // GST_DEBUG (0,"PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
571 // pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy);
572 GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
573 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
583 } else if (GST_IS_SRC (element)) {
584 GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
585 bin->entries = g_list_prepend (bin->entries,element);
587 cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
590 pads = gst_element_get_pad_list (element);
592 pad = GST_PAD(pads->data);
594 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
595 GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
596 // set the proxy functions
597 pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
598 GST_DEBUG (0,"pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
599 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
600 GST_DEBUG (0,"setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
601 // set the proxy functions
602 GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
603 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
604 GST_DEBUG (0,"pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
605 &pad->pullfunc,gst_bin_pullfunc_proxy);
606 pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
608 pads = g_list_next (pads);
610 elements = g_list_next (elements);
612 // if there are no entries, we have to pick one at random
613 if (bin->num_entries == 0)
614 bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
617 GST_DEBUG (0,"don't need cothreads, looking for entry points\n");
618 // we have to find which elements will drive an iteration
619 elements = bin->children;
621 element = GST_ELEMENT (elements->data);
622 GST_DEBUG (0,"found element \"%s\"\n", GST_ELEMENT_NAME (element));
623 if (GST_IS_BIN (element)) {
624 gst_bin_create_plan (GST_BIN (element));
626 if (GST_IS_SRC (element)) {
627 GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
628 bin->entries = g_list_prepend (bin->entries, element);
632 // go through all the pads, set pointers, and check for connections
633 pads = gst_element_get_pad_list (element);
635 pad = GST_PAD (pads->data);
637 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
638 GST_DEBUG (0,"found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
640 // copy the peer's chain function, easy enough
641 GST_DEBUG (0,"copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
642 GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)));
644 // need to walk through and check for outside connections
645 //FIXME need to do this for all pads
646 // get the pad's peer
647 peer = GST_RPAD_PEER(pad);
649 GST_DEBUG (0,"found SINK pad %s has no peer\n", GST_ELEMENT_NAME (pad));
652 // get the parent of the peer of the pad
653 outside = GST_ELEMENT (GST_RPAD_PARENT(peer));
655 // if it's a connection and it's not ours...
656 if (GST_IS_CONNECTION (outside) &&
657 (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
658 gst_info("gstbin: element \"%s\" is the external source Connection "
659 "for internal element \"%s\"\n",
660 GST_ELEMENT_NAME (GST_ELEMENT (outside)),
661 GST_ELEMENT_NAME (GST_ELEMENT (element)));
662 bin->entries = g_list_prepend (bin->entries, outside);
667 GST_DEBUG (0,"found pad %s\n", GST_ELEMENT_NAME (pad));
669 pads = g_list_next (pads);
672 elements = g_list_next (elements);
680 // If cothreads are needed, we need to not only find elements but
681 // set up cothread states and various proxy functions.
682 if (bin->need_cothreads) {
683 GST_DEBUG (0,"bin is using cothreads\n");
685 // first create thread context
686 if (bin->threadcontext == NULL) {
687 GST_DEBUG (0,"initializing cothread context\n");
688 bin->threadcontext = cothread_init ();
691 // walk through all the children
692 elements = bin->managed_elements;
694 element = GST_ELEMENT (elements->data);
695 elements = g_list_next (elements);
697 // start out with a NULL warpper function, we'll set it if we want a cothread
698 wrapper_function = NULL;
700 // have to decide if we need to or can use a cothreads, and if so which wrapper
701 // first of all, if there's a loopfunc, the decision's already made
702 if (element->loopfunc != NULL) {
703 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
704 GST_DEBUG (0,"element %s is a loopfunc, must use a cothread\n",GST_ELEMENT_NAME (element));
706 // otherwise we need to decide if it needs a cothread
707 // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
708 if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
709 (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
710 !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
711 // base it on whether we're going to loop through source or sink pads
712 if (element->numsinkpads == 0)
713 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
715 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
719 // walk through the all the pads for this element, setting proxy functions
720 // the selection of proxy functions depends on whether we're in a cothread or not
721 pads = gst_element_get_pad_list (element);
723 pad = GST_PAD (pads->data);
724 pads = g_list_next (pads);
726 // check to see if someone else gets to set up the element
727 peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
728 if (peer_manager != GST_ELEMENT(bin)) {
729 GST_DEBUG (0,"WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
732 // if the wrapper_function is set, we need to use the proxy functions
733 if (wrapper_function != NULL) {
734 // set up proxy functions
735 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
736 GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
737 pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
738 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
739 GST_DEBUG (0,"setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
740 GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
741 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
744 // otherwise we need to set up for 'traditional' chaining
745 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
746 // we can just copy the chain function, since it shares the prototype
747 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",
748 GST_DEBUG_PAD_NAME(pad));
749 pad->pushfunc = pad->chainfunc;
750 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
751 // we can just copy the get function, since it shares the prototype
752 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",
753 GST_DEBUG_PAD_NAME(pad));
754 pad->pullfunc = pad->getfunc;
759 // if a loopfunc has been specified, create and set up a cothread
760 if (wrapper_function != NULL) {
761 if (element->threadstate == NULL) {
762 element->threadstate = cothread_create (bin->threadcontext);
763 GST_DEBUG (0,"created cothread %p (@%p) for \"%s\"\n",element->threadstate,
764 &element->threadstate,GST_ELEMENT_NAME (element));
766 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
767 GST_DEBUG (0,"set wrapper function for \"%s\" to &%s\n",GST_ELEMENT_NAME (element),
768 GST_DEBUG_FUNCPTR_NAME(wrapper_function));
771 // // HACK: if the element isn't decoupled, it's an entry
772 // if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
773 // bin->entries = g_list_append(bin->entries, element);
776 // otherwise, cothreads are not needed
778 GST_DEBUG (0,"bin is chained, no cothreads needed\n");
780 elements = bin->managed_elements;
782 element = GST_ELEMENT (elements->data);
783 elements = g_list_next (elements);
785 pads = gst_element_get_pad_list (element);
787 pad = GST_PAD (pads->data);
788 pads = g_list_next (pads);
790 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
791 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
792 pad->pushfunc = pad->chainfunc;
794 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
795 pad->pullfunc = pad->getfunc;
803 gst_schedule_lock_element (GstSchedule *sched,GstElement *element)
805 if (element->threadstate)
806 cothread_lock(element->threadstate);
810 gst_schedule_unlock_element (GstSchedule *sched,GstElement *element)
812 if (element->threadstate)
813 cothread_unlock(element->threadstate);
817 /*************** INCREMENTAL SCHEDULING CODE STARTS HERE ***************/
820 static void gst_schedule_class_init (GstScheduleClass *klass);
821 static void gst_schedule_init (GstSchedule *schedule);
823 static GstObjectClass *parent_class = NULL;
825 GType gst_schedule_get_type(void) {
826 static GType schedule_type = 0;
828 if (!schedule_type) {
829 static const GTypeInfo schedule_info = {
830 sizeof(GstScheduleClass),
833 (GClassInitFunc)gst_schedule_class_init,
838 (GInstanceInitFunc)gst_schedule_init,
840 schedule_type = g_type_register_static(GST_TYPE_OBJECT, "GstSchedule", &schedule_info, 0);
842 return schedule_type;
846 gst_schedule_class_init (GstScheduleClass *klass)
848 parent_class = g_type_class_ref(GST_TYPE_OBJECT);
852 gst_schedule_init (GstSchedule *schedule)
854 schedule->add_element = GST_DEBUG_FUNCPTR(gst_schedule_add_element);
855 schedule->remove_element = GST_DEBUG_FUNCPTR(gst_schedule_remove_element);
856 schedule->enable_element = GST_DEBUG_FUNCPTR(gst_schedule_enable_element);
857 schedule->disable_element = GST_DEBUG_FUNCPTR(gst_schedule_disable_element);
858 schedule->lock_element = GST_DEBUG_FUNCPTR(gst_schedule_lock_element);
859 schedule->unlock_element = GST_DEBUG_FUNCPTR(gst_schedule_unlock_element);
860 schedule->pad_connect = GST_DEBUG_FUNCPTR(gst_schedule_pad_connect);
861 schedule->pad_disconnect = GST_DEBUG_FUNCPTR(gst_schedule_pad_disconnect);
862 schedule->pad_select = GST_DEBUG_FUNCPTR(gst_schedule_pad_select);
863 schedule->iterate = GST_DEBUG_FUNCPTR(gst_schedule_iterate);
867 gst_schedule_new(GstElement *parent)
869 GstSchedule *sched = GST_SCHEDULE (g_object_new(GST_TYPE_SCHEDULE,NULL));
871 sched->parent = parent;
877 /* this function will look at a pad and determine if the peer parent is
878 * a possible candidate for connecting up in the same chain. */
880 GstElement *gst_schedule_check_pad (GstSchedule *sched, GstPad *pad) {
882 GstElement *element, *peerelement;
884 GST_INFO (GST_CAT_SCHEDULING, "checking pad %s:%s for peer in scheduler",
885 GST_DEBUG_PAD_NAME(pad));
887 element = GST_ELEMENT(GST_PAD_PARENT(peer));
888 GST_DEBUG(GST_CAT_SCHEDULING, "element is \"%s\"\n",GST_ELEMENT_NAME(element));
890 peer = GST_PAD_PEER (pad);
891 if (peer == NULL) return NULL;
892 peerelement = GST_ELEMENT(GST_PAD_PARENT (peer));
893 if (peerelement == NULL) return NULL;
894 GST_DEBUG(GST_CAT_SCHEDULING, "peer element is \"%s\"\n",GST_ELEMENT_NAME(peerelement));
896 // now check to see if it's in the same schedule
897 if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
898 GST_DEBUG(GST_CAT_SCHEDULING, "peer is in same schedule\n");
902 // otherwise it's not a candidate
908 gst_schedule_chain_new (GstSchedule *sched)
910 GstScheduleChain *chain = g_new (GstScheduleChain, 1);
912 // initialize the chain with sane values
913 chain->sched = sched;
914 chain->disabled = NULL;
915 chain->elements = NULL;
916 chain->num_elements = 0;
918 chain->cothreaded_elements = 0;
919 chain->schedule = FALSE;
921 // add the chain to the schedules' list of chains
922 sched->chains = g_list_prepend (sched->chains, chain);
925 GST_INFO (GST_CAT_SCHEDULING, "created new chain %p, now are %d chains in sched %p",
926 chain,sched->num_chains,sched);
932 gst_schedule_chain_destroy (GstScheduleChain *chain)
934 GstSchedule *sched = chain->sched;
936 // remove the chain from the schedules' list of chains
937 chain->sched->chains = g_list_remove (chain->sched->chains, chain);
938 chain->sched->num_chains--;
941 g_list_free (chain->disabled); // should be empty...
942 g_list_free (chain->elements); // ditto
945 GST_INFO (GST_CAT_SCHEDULING, "destroyed chain %p, now are %d chains in sched %p",chain,sched->num_chains,sched);
949 gst_schedule_chain_add_element (GstScheduleChain *chain, GstElement *element)
951 GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to chain %p", GST_ELEMENT_NAME (element),chain);
953 // set the sched pointer for the element
954 element->sched = chain->sched;
956 // add the element to the list of 'disabled' elements
957 chain->disabled = g_list_prepend (chain->disabled, element);
958 chain->num_elements++;
962 gst_schedule_chain_enable_element (GstScheduleChain *chain, GstElement *element)
964 GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
966 // remove from disabled list
967 chain->disabled = g_list_remove (chain->disabled, element);
969 // add to elements list
970 chain->elements = g_list_prepend (chain->elements, element);
972 // reschedule the chain
973 gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
977 gst_schedule_chain_disable_element (GstScheduleChain *chain, GstElement *element)
979 GST_INFO (GST_CAT_SCHEDULING, "disabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
981 // remove from elements list
982 chain->elements = g_list_remove (chain->elements, element);
984 // add to disabled list
985 chain->disabled = g_list_prepend (chain->disabled, element);
987 // reschedule the chain
988 // FIXME this should be done only if manager state != NULL
989 // gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
993 gst_schedule_chain_remove_element (GstScheduleChain *chain, GstElement *element)
995 GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),chain);
997 // if it's active, deactivate it
998 if (g_list_find (chain->elements, element)) {
999 gst_schedule_chain_disable_element (chain, element);
1002 // remove the element from the list of elements
1003 chain->disabled = g_list_remove (chain->disabled, element);
1004 chain->num_elements--;
1006 // if there are no more elements in the chain, destroy the chain
1007 if (chain->num_elements == 0)
1008 gst_schedule_chain_destroy(chain);
1010 // unset the sched pointer for the element
1011 element->sched = NULL;
1015 gst_schedule_chain_elements (GstSchedule *sched, GstElement *element1, GstElement *element2)
1018 GstScheduleChain *chain;
1019 GstScheduleChain *chain1 = NULL, *chain2 = NULL;
1020 GstElement *element;
1022 // first find the chains that hold the two
1023 chains = sched->chains;
1025 chain = (GstScheduleChain *)(chains->data);
1026 chains = g_list_next(chains);
1028 if (g_list_find (chain->disabled,element1))
1030 else if (g_list_find (chain->elements,element1))
1033 if (g_list_find (chain->disabled,element2))
1035 else if (g_list_find (chain->elements,element2))
1039 // first check to see if they're in the same chain, we're done if that's the case
1040 if ((chain1 != NULL) && (chain1 == chain2)) {
1041 GST_INFO (GST_CAT_SCHEDULING, "elements are already in the same chain");
1045 // now, if neither element has a chain, create one
1046 if ((chain1 == NULL) && (chain2 == NULL)) {
1047 GST_INFO (GST_CAT_SCHEDULING, "creating new chain to hold two new elements");
1048 chain = gst_schedule_chain_new (sched);
1049 gst_schedule_chain_add_element (chain, element1);
1050 gst_schedule_chain_add_element (chain, element2);
1051 // FIXME chain changed here
1052 // gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1054 // otherwise if both have chains already, join them
1055 } else if ((chain1 != NULL) && (chain2 != NULL)) {
1056 GST_INFO (GST_CAT_SCHEDULING, "merging chain %p into chain %p",chain2,chain1);
1057 // take the contents of chain2 and merge them into chain1
1058 chain1->disabled = g_list_concat (chain1->disabled, g_list_copy(chain2->disabled));
1059 chain1->elements = g_list_concat (chain1->elements, g_list_copy(chain2->elements));
1060 chain1->num_elements += chain2->num_elements;
1061 // FIXME chain changed here
1062 // gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1064 gst_schedule_chain_destroy(chain2);
1066 // otherwise one has a chain already, the other doesn't
1068 // pick out which one has the chain, and which doesn't
1069 if (chain1 != NULL) chain = chain1, element = element2;
1070 else chain = chain2, element = element1;
1072 GST_INFO (GST_CAT_SCHEDULING, "adding element to existing chain");
1073 gst_schedule_chain_add_element (chain, element);
1074 // FIXME chain changed here
1075 // gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1080 gst_schedule_pad_connect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1082 GstElement *srcelement,*sinkelement;
1084 srcelement = GST_PAD_PARENT(srcpad);
1085 g_return_if_fail(srcelement != NULL);
1086 sinkelement = GST_PAD_PARENT(sinkpad);
1087 g_return_if_fail(sinkelement != NULL);
1089 GST_INFO (GST_CAT_SCHEDULING, "have pad connected callback on %s:%s to %s:%s",GST_DEBUG_PAD_NAME(srcpad),GST_DEBUG_PAD_NAME(sinkpad));
1090 GST_DEBUG(GST_CAT_SCHEDULING, "srcpad sched is %p, sinkpad sched is %p\n",
1091 GST_ELEMENT_SCHED(srcelement),GST_ELEMENT_SCHED(sinkelement));
1093 if (GST_ELEMENT_SCHED(srcelement) == GST_ELEMENT_SCHED(sinkelement)) {
1094 GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same schedule, chaining together",GST_DEBUG_PAD_NAME(sinkpad));
1095 gst_schedule_chain_elements (sched, srcelement, sinkelement);
1099 // find the chain within the schedule that holds the element, if any
1101 gst_schedule_find_chain (GstSchedule *sched, GstElement *element)
1104 GstScheduleChain *chain;
1106 GST_INFO (GST_CAT_SCHEDULING, "searching for element \"%s\" in chains",GST_ELEMENT_NAME(element));
1108 chains = sched->chains;
1110 chain = (GstScheduleChain *)(chains->data);
1111 chains = g_list_next (chains);
1113 if (g_list_find (chain->elements, element))
1115 if (g_list_find (chain->disabled, element))
1123 gst_schedule_chain_recursive_add (GstScheduleChain *chain, GstElement *element)
1127 GstElement *peerelement;
1129 // add the element to the chain
1130 gst_schedule_chain_add_element (chain, element);
1132 GST_DEBUG(GST_CAT_SCHEDULING, "recursing on element \"%s\"\n",GST_ELEMENT_NAME(element));
1133 // now go through all the pads and see which peers can be added
1134 pads = element->pads;
1136 pad = GST_PAD(pads->data);
1137 pads = g_list_next (pads);
1139 GST_DEBUG(GST_CAT_SCHEDULING, "have pad %s:%s, checking for valid peer\n",GST_DEBUG_PAD_NAME(pad));
1140 // if the peer exists and could be in the same chain
1141 if (GST_PAD_PEER(pad)) {
1142 GST_DEBUG(GST_CAT_SCHEDULING, "has peer %s:%s\n",GST_DEBUG_PAD_NAME(GST_PAD_PEER(pad)));
1143 peerelement = GST_PAD_PARENT(GST_PAD_PEER(pad));
1144 if (GST_ELEMENT_SCHED(GST_PAD_PARENT(pad)) == GST_ELEMENT_SCHED(peerelement)) {
1145 GST_DEBUG(GST_CAT_SCHEDULING, "peer \"%s\" is valid for same chain\n",GST_ELEMENT_NAME(peerelement));
1146 // if it's not already in a chain, add it to this one
1147 if (gst_schedule_find_chain (chain->sched, peerelement) == NULL) {
1148 gst_schedule_chain_recursive_add (chain, peerelement);
1156 gst_schedule_pad_disconnect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1158 GstScheduleChain *chain;
1159 GstElement *element1, *element2;
1160 GstScheduleChain *chain1, *chain2;
1162 GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
1163 GST_DEBUG_PAD_NAME(srcpad), GST_DEBUG_PAD_NAME(sinkpad));
1165 // we need to have the parent elements of each pad
1166 element1 = GST_ELEMENT(GST_PAD_PARENT(srcpad));
1167 element2 = GST_ELEMENT(GST_PAD_PARENT(sinkpad));
1169 // first task is to remove the old chain they belonged to.
1170 // this can be accomplished by taking either of the elements,
1171 // since they are guaranteed to be in the same chain
1172 // FIXME is it potentially better to make an attempt at splitting cleaner??
1173 chain = gst_schedule_find_chain (sched, element1);
1175 GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1176 gst_schedule_chain_destroy (chain);
1179 // now create a new chain to hold element1 and build it from scratch
1180 chain1 = gst_schedule_chain_new (sched);
1181 gst_schedule_chain_recursive_add (chain1, element1);
1183 // check the other element to see if it landed in the newly created chain
1184 if (gst_schedule_find_chain (sched, element2) == NULL) {
1185 // if not in chain, create chain and build from scratch
1186 chain2 = gst_schedule_chain_new (sched);
1187 gst_schedule_chain_recursive_add (chain2, element2);
1192 gst_schedule_pad_select (GstSchedule *sched, GList *padlist)
1195 GST_INFO (GST_CAT_SCHEDULING, "performing select");
1198 pad = GST_PAD (padlist->data);
1200 GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_select_proxy);
1202 padlist = g_list_next (padlist);
1205 GstRealPad *peer = GST_RPAD_PEER(pad);
1207 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
1209 g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
1210 pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
1212 g_assert (pad != NULL);
1213 g_print ("back from select (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
1219 gst_schedule_add_element (GstSchedule *sched, GstElement *element)
1223 GstElement *peerelement;
1224 GstScheduleChain *chain;
1226 g_return_if_fail (element != NULL);
1227 g_return_if_fail (GST_IS_ELEMENT(element));
1229 // if it's already in this schedule, don't bother doing anything
1230 if (GST_ELEMENT_SCHED(element) == sched) return;
1232 GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to schedule",
1233 GST_ELEMENT_NAME(element));
1235 // if the element already has a different scheduler, remove the element from it
1236 if (GST_ELEMENT_SCHED(element)) {
1237 gst_schedule_remove_element(GST_ELEMENT_SCHED(element),element);
1240 // set the sched pointer in the element itself
1241 GST_ELEMENT_SCHED(element) = sched;
1243 // only deal with elements after this point, not bins
1244 // exception is made for Bin's that are schedulable, like the autoplugger
1245 if (GST_IS_BIN (element) && !GST_FLAG_IS_SET(element, GST_BIN_SELF_SCHEDULABLE)) return;
1247 // first add it to the list of elements that are to be scheduled
1248 sched->elements = g_list_prepend (sched->elements, element);
1249 sched->num_elements++;
1251 // create a chain to hold it, and add
1252 chain = gst_schedule_chain_new (sched);
1253 gst_schedule_chain_add_element (chain, element);
1255 // set the sched pointer in all the pads
1256 pads = element->pads;
1258 pad = GST_PAD(pads->data);
1259 pads = g_list_next(pads);
1261 // we only operate on real pads
1262 if (!GST_IS_REAL_PAD(pad)) continue;
1264 // set the pad's sched pointer
1265 gst_pad_set_sched (pad, sched);
1267 // if the peer element exists and is a candidate
1268 if (GST_PAD_PEER(pad)) {
1269 peerelement = GST_PAD_PARENT( GST_PAD_PEER (pad) );
1270 if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
1271 GST_INFO (GST_CAT_SCHEDULING, "peer is in same schedule, chaining together");
1272 // make sure that the two elements are in the same chain
1273 gst_schedule_chain_elements (sched,element,peerelement);
1280 gst_schedule_enable_element (GstSchedule *sched, GstElement *element)
1282 GstScheduleChain *chain;
1284 // find the chain the element's in
1285 chain = gst_schedule_find_chain (sched, element);
1288 gst_schedule_chain_enable_element (chain, element);
1290 GST_INFO (GST_CAT_SCHEDULING, "element not found in any chain, not enabling");
1294 gst_schedule_disable_element (GstSchedule *sched, GstElement *element)
1296 GstScheduleChain *chain;
1298 // find the chain the element is in
1299 chain = gst_schedule_find_chain (sched, element);
1301 // remove it from the chain
1303 gst_schedule_chain_disable_element(chain,element);
1308 gst_schedule_remove_element (GstSchedule *sched, GstElement *element)
1310 GstScheduleChain *chain;
1312 g_return_if_fail (element != NULL);
1313 g_return_if_fail (GST_IS_ELEMENT(element));
1315 if (g_list_find (sched->elements, element)) {
1316 GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from schedule",
1317 GST_ELEMENT_NAME(element));
1319 // find what chain the element is in
1320 chain = gst_schedule_find_chain(sched, element);
1322 // remove it from its chain
1323 gst_schedule_chain_remove_element (chain, element);
1325 // remove it from the list of elements
1326 sched->elements = g_list_remove (sched->elements, element);
1327 sched->num_elements--;
1329 // unset the scheduler pointer in the element
1330 GST_ELEMENT_SCHED(element) = NULL;
1335 gst_schedule_iterate (GstSchedule *sched)
1337 GstBin *bin = GST_BIN(sched->parent);
1339 GstScheduleChain *chain;
1341 gint num_scheduled = 0;
1342 gboolean eos = FALSE;
1345 GST_DEBUG_ENTER("(\"%s\")", GST_ELEMENT_NAME (bin));
1347 g_return_val_if_fail (bin != NULL, TRUE);
1348 g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
1349 // g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
1351 // step through all the chains
1352 chains = sched->chains;
1353 // if (chains == NULL) return FALSE;
1354 g_return_val_if_fail (chains != NULL, FALSE);
1356 chain = (GstScheduleChain *)(chains->data);
1357 chains = g_list_next (chains);
1359 // if (!chain->need_scheduling) continue;
1361 // if (chain->need_cothreads) {
1362 // all we really have to do is switch to the first child
1363 // FIXME this should be lots more intelligent about where to start
1364 GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via cothreads\n");
1366 if (chain->elements) {
1367 entry = NULL; //MattH ADDED?
1368 GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_elements);
1369 elements = chain->elements;
1371 entry = GST_ELEMENT(elements->data);
1372 elements = g_list_next(elements);
1373 if (GST_FLAG_IS_SET(entry,GST_ELEMENT_DECOUPLED)) {
1374 GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is DECOUPLED, skipping\n",GST_ELEMENT_NAME(entry));
1376 } else if (GST_FLAG_IS_SET(entry,GST_ELEMENT_NO_ENTRY)) {
1377 GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is not valid, skipping\n",GST_ELEMENT_NAME(entry));
1383 GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1384 GST_DEBUG (GST_CAT_DATAFLOW,"set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
1385 GST_ELEMENT_NAME (entry),entry);
1386 cothread_switch (entry->threadstate);
1388 // following is a check to see if the chain was interrupted due to a
1389 // top-half state_change(). (i.e., if there's a pending state.)
1391 // if it was, return to gstthread.c::gst_thread_main_loop() to
1392 // execute the state change.
1393 GST_DEBUG (GST_CAT_DATAFLOW,"cothread switch ended or interrupted\n");
1394 if (GST_STATE_PENDING(GST_SCHEDULE(sched)->parent) != GST_STATE_VOID_PENDING)
1396 GST_DEBUG (GST_CAT_DATAFLOW,"handle pending state %d\n",
1397 GST_STATE_PENDING(GST_SCHEDULE(sched)->parent));
1402 GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
1406 GST_INFO (GST_CAT_DATAFLOW,"NO ENABLED ELEMENTS IN CHAIN!!");
1412 GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via chain-functions\n");
1414 entries = chain->entries;
1416 g_assert (entries != NULL);
1419 entry = GST_ELEMENT (entries->data);
1420 entries = g_list_next (entries);
1422 GST_DEBUG (GST_CAT_DATAFLOW,"have entry \"%s\"\n",GST_ELEMENT_NAME (entry));
1424 if (GST_IS_BIN (entry)) {
1425 gst_bin_iterate (GST_BIN (entry));
1429 pad = GST_PAD (pads->data);
1430 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC) {
1431 GST_DEBUG (GST_CAT_DATAFLOW,"calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
1432 if (GST_REAL_PAD(pad)->getfunc == NULL)
1433 fprintf(stderr, "error, no getfunc in \"%s\"\n", GST_ELEMENT_NAME (entry));
1435 buf = (GST_REAL_PAD(pad)->getfunc)(pad);
1436 if (buf) gst_pad_push(pad,buf);
1438 pads = g_list_next (pads);
1447 // check if nothing was scheduled that was ours..
1448 if (!num_scheduled) {
1449 // are there any other elements that are still busy?
1450 if (bin->num_eos_providers) {
1452 GST_DEBUG (GST_CATA_DATAFLOW,"waiting for eos providers\n");
1453 g_cond_wait (bin->eoscond, GST_OBJECT(bin)->lock);
1454 GST_DEBUG (GST_CAT_DATAFLOW,"num eos providers %d\n", bin->num_eos_providers);
1458 gst_element_signal_eos (GST_ELEMENT (bin));
1464 GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
1471 gst_schedule_show (GstSchedule *sched)
1473 GList *chains, *elements;
1474 GstElement *element;
1475 GstScheduleChain *chain;
1477 if (sched == NULL) {
1478 g_print("schedule doesn't exist for this element\n");
1482 g_return_if_fail(GST_IS_SCHEDULE(sched));
1484 g_print("SCHEDULE DUMP FOR MANAGING BIN \"%s\"\n",GST_ELEMENT_NAME(sched->parent));
1486 g_print("schedule has %d elements in it: ",sched->num_elements);
1487 elements = sched->elements;
1489 element = GST_ELEMENT(elements->data);
1490 elements = g_list_next(elements);
1492 g_print("%s, ",GST_ELEMENT_NAME(element));
1496 g_print("schedule has %d chains in it\n",sched->num_chains);
1497 chains = sched->chains;
1499 chain = (GstScheduleChain *)(chains->data);
1500 chains = g_list_next(chains);
1502 g_print("%p: ",chain);
1504 elements = chain->disabled;
1506 element = GST_ELEMENT(elements->data);
1507 elements = g_list_next(elements);
1509 g_print("!%s, ",GST_ELEMENT_NAME(element));
1512 elements = chain->elements;
1514 element = GST_ELEMENT(elements->data);
1515 elements = g_list_next(elements);
1517 g_print("%s, ",GST_ELEMENT_NAME(element));