2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstscheduler.c: Default scheduling code for most cases
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
23 //#define GST_DEBUG_ENABLED
24 #include "gst_private.h"
26 #include "gstscheduler.h"
30 gst_schedule_loopfunc_wrapper (int argc,char *argv[])
32 GstElement *element = GST_ELEMENT (argv);
33 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
35 GST_DEBUG_ENTER("(%d,'%s')",argc,name);
38 GST_DEBUG (GST_CAT_DATAFLOW,"calling loopfunc %s for element %s\n",
39 GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
40 (element->loopfunc) (element);
41 GST_DEBUG (GST_CAT_DATAFLOW,"element %s ended loop function\n", name);
42 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
43 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
45 GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
50 gst_schedule_chain_wrapper (int argc,char *argv[])
52 GstElement *element = GST_ELEMENT (argv);
53 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
55 GST_DEBUG_ENTER("(\"%s\")",name);
57 GST_DEBUG (GST_CAT_DATAFLOW,"stepping through pads\n");
60 GList *pads = element->pads;
63 GstPad *pad = GST_PAD (pads->data);
66 pads = g_list_next (pads);
67 if (!GST_IS_REAL_PAD(pad))
69 realpad = GST_REAL_PAD(pad);
70 if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
73 GST_DEBUG (GST_CAT_DATAFLOW,"pulling a buffer from %s:%s\n", name, GST_PAD_NAME (pad));
74 buf = gst_pad_pull (pad);
75 GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s\n", name, GST_PAD_NAME (pad));
77 GST_RPAD_CHAINFUNC(realpad) (pad,buf);
78 GST_DEBUG (GST_CAT_DATAFLOW,"calling chain function of %s:%s done\n", name, GST_PAD_NAME (pad));
81 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
82 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
84 GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
89 gst_schedule_src_wrapper (int argc,char *argv[])
91 GstElement *element = GST_ELEMENT (argv);
94 GstBuffer *buf = NULL;
95 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
97 GST_DEBUG_ENTER("(%d,\"%s\")",argc,name);
100 pads = element->pads;
102 if (!GST_IS_REAL_PAD(pads->data)) continue;
103 realpad = (GstRealPad*)(pads->data);
104 pads = g_list_next(pads);
105 if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SRC) {
106 GST_DEBUG (GST_CAT_DATAFLOW,"calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
107 if (realpad->regiontype != GST_REGION_VOID) {
108 g_return_val_if_fail (GST_RPAD_GETREGIONFUNC(realpad) != NULL, 0);
109 // if (GST_RPAD_GETREGIONFUNC(realpad) == NULL)
110 // fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
112 buf = (GST_RPAD_GETREGIONFUNC(realpad))((GstPad*)realpad,realpad->regiontype,realpad->offset,realpad->len);
113 realpad->regiontype = GST_REGION_VOID;
115 g_return_val_if_fail (GST_RPAD_GETFUNC(realpad) != NULL, 0);
116 // if (GST_RPAD_GETFUNC(realpad) == NULL)
117 // fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
119 buf = GST_RPAD_GETFUNC(realpad) ((GstPad*)realpad);
122 GST_DEBUG (GST_CAT_DATAFLOW,"calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
123 gst_pad_push ((GstPad*)realpad, buf);
126 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
127 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
134 gst_schedule_chainhandler_proxy (GstPad *pad, GstBuffer *buf)
136 GstRealPad *peer = GST_RPAD_PEER(pad);
138 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
139 GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer \"%s:%s\"'s pen\n",buf,GST_DEBUG_PAD_NAME(peer));
141 // FIXME this should be bounded
142 // loop until the bufferpen is empty so we can fill it up again
143 while (GST_RPAD_BUFPEN(pad) != NULL) {
144 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
145 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
146 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
148 // we may no longer be the same pad, check.
149 if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
150 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
151 pad = (GstPad *)GST_RPAD_PEER(peer);
155 g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
156 // now fill the bufferpen and switch so it can be consumed
157 GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
158 GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
159 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
161 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
165 gst_schedule_select_proxy (GstPad *pad, GstBuffer *buf)
167 GstRealPad *peer = GST_RPAD_PEER(pad);
169 g_print ("select proxy (%s:%s)\n",GST_DEBUG_PAD_NAME(pad));
171 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
173 GST_DEBUG (GST_CAT_DATAFLOW,"putting buffer %p in peer's pen\n",buf);
175 g_assert (GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) == NULL);
176 // now fill the bufferpen and switch so it can be consumed
177 GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
178 GST_DEBUG (GST_CAT_DATAFLOW,"switching to %p\n",GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
179 g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
180 GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
181 GST_FLAG_UNSET(GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
182 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
184 g_print ("done switching\n");
185 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
190 gst_schedule_gethandler_proxy (GstPad *pad)
193 GstRealPad *peer = GST_RPAD_PEER(pad);
195 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
197 // FIXME this should be bounded
198 // we will loop switching to the peer until it's filled up the bufferpen
199 while (GST_RPAD_BUFPEN(pad) == NULL) {
200 GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
201 GST_ELEMENT_NAME(GST_ELEMENT(GST_PAD_PARENT(pad))),
202 GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
203 cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
205 // we may no longer be the same pad, check.
206 if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
207 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
208 pad = (GstPad *)GST_RPAD_PEER(peer);
211 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
213 // now grab the buffer from the pen, clear the pen, and return the buffer
214 buf = GST_RPAD_BUFPEN(pad);
215 GST_RPAD_BUFPEN(pad) = NULL;
220 gst_schedule_pullregionfunc_proxy (GstPad *pad,GstRegionType type,guint64 offset,guint64 len)
223 GstRealPad *peer = GST_RPAD_PEER(pad);
225 GST_DEBUG_ENTER("%s:%s,%d,%lld,%lld",GST_DEBUG_PAD_NAME(pad),type,offset,len);
227 // put the region info into the pad
228 GST_RPAD_REGIONTYPE(pad) = type;
229 GST_RPAD_OFFSET(pad) = offset;
230 GST_RPAD_LEN(pad) = len;
232 // FIXME this should be bounded
233 // we will loop switching to the peer until it's filled up the bufferpen
234 while (GST_RPAD_BUFPEN(pad) == NULL) {
235 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
236 GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
237 cothread_switch (GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate);
239 // we may no longer be the same pad, check.
240 if (GST_RPAD_PEER(peer) != (GstRealPad *)pad) {
241 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
242 pad = (GstPad *)GST_RPAD_PEER(peer);
245 GST_DEBUG (GST_CAT_DATAFLOW,"done switching\n");
247 // now grab the buffer from the pen, clear the pen, and return the buffer
248 buf = GST_RPAD_BUFPEN(pad);
249 GST_RPAD_BUFPEN(pad) = NULL;
255 gst_schedule_cothreaded_chain (GstBin *bin, GstScheduleChain *chain) {
258 cothread_func wrapper_function;
262 GST_DEBUG (GST_CAT_SCHEDULING,"chain is using COTHREADS\n");
264 // first create thread context
265 if (bin->threadcontext == NULL) {
266 GST_DEBUG (GST_CAT_SCHEDULING,"initializing cothread context\n");
267 bin->threadcontext = cothread_init ();
270 // walk through all the chain's elements
271 elements = chain->elements;
273 element = GST_ELEMENT (elements->data);
274 elements = g_list_next (elements);
276 // start out without a wrapper function, we select it later
277 wrapper_function = NULL;
279 // if the element has a loopfunc...
280 if (element->loopfunc != NULL) {
281 wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_loopfunc_wrapper);
282 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a loop-based\n",GST_ELEMENT_NAME(element));
284 // otherwise we need to decide what kind of cothread
285 // if it's not DECOUPLED, we decide based on whether it's a source or not
286 if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
287 // if it doesn't have any sinks, it must be a source (duh)
288 if (element->numsinkpads == 0) {
289 wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_src_wrapper);
290 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a source, using _src_wrapper\n",GST_ELEMENT_NAME(element));
292 wrapper_function = GST_DEBUG_FUNCPTR(gst_schedule_chain_wrapper);
293 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' is a filter, using _chain_wrapper\n",GST_ELEMENT_NAME(element));
298 // now we have to walk through the pads to set up their state
299 pads = gst_element_get_pad_list (element);
301 pad = GST_PAD (pads->data);
302 pads = g_list_next (pads);
303 if (!GST_IS_REAL_PAD(pad)) continue;
305 // if the element is DECOUPLED or outside the manager, we have to chain
306 if ((wrapper_function == NULL) ||
307 (GST_RPAD_PEER(pad) &&
308 (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
310 // set the chain proxies
311 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
312 GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
313 GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
315 GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
316 GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
317 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
320 // otherwise we really are a cothread
322 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
323 GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
324 GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_chainhandler_proxy);
326 GST_DEBUG (GST_CAT_SCHEDULING,"setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
327 GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_gethandler_proxy);
328 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_schedule_pullregionfunc_proxy);
333 // need to set up the cothread now
334 if (wrapper_function != NULL) {
335 if (element->threadstate == NULL) {
336 // FIXME handle cothread_create returning NULL
337 element->threadstate = cothread_create (bin->threadcontext);
338 GST_DEBUG (GST_CAT_SCHEDULING,"created cothread %p for '%s'\n",element->threadstate,GST_ELEMENT_NAME(element));
340 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
341 GST_DEBUG (GST_CAT_SCHEDULING,"set wrapper function for '%s' to &%s\n",GST_ELEMENT_NAME(element),
342 GST_DEBUG_FUNCPTR_NAME(wrapper_function));
347 G_GNUC_UNUSED static void
348 gst_schedule_chained_chain (GstBin *bin, _GstBinChain *chain) {
354 GST_DEBUG (GST_CAT_SCHEDULING,"chain entered\n");
355 // walk through all the elements
356 elements = chain->elements;
358 element = GST_ELEMENT (elements->data);
359 elements = g_list_next (elements);
361 // walk through all the pads
362 pads = gst_element_get_pad_list (element);
364 pad = GST_PAD (pads->data);
365 pads = g_list_next (pads);
366 if (!GST_IS_REAL_PAD(pad)) continue;
368 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
369 GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
370 GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
372 GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
373 GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
374 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
382 gst_bin_schedule_cleanup (GstBin *bin)
387 chains = bin->chains;
389 chain = (_GstBinChain *)(chains->data);
390 chains = g_list_next(chains);
392 // g_list_free(chain->disabled);
393 g_list_free(chain->elements);
394 g_list_free(chain->entries);
398 g_list_free(bin->chains);
404 gst_scheduler_handle_eos (GstElement *element, _GstBinChain *chain)
406 GST_DEBUG (GST_CAT_SCHEDULING,"chain removed from scheduler, EOS from element \"%s\"\n", GST_ELEMENT_NAME (element));
407 chain->need_scheduling = FALSE;
411 void gst_bin_schedule_func(GstBin *bin) {
414 GSList *pending = NULL;
417 GstElement *peerparent;
419 GstScheduleChain *chain;
421 GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME (GST_ELEMENT (bin)));
423 gst_bin_schedule_cleanup(bin);
425 // next we have to find all the separate scheduling chains
426 GST_DEBUG (GST_CAT_SCHEDULING,"attempting to find scheduling chains...\n");
427 // first make a copy of the managed_elements we can mess with
428 elements = g_list_copy (bin->managed_elements);
429 // we have to repeat until the list is empty to get all chains
431 element = GST_ELEMENT (elements->data);
433 // if this is a DECOUPLED element
434 if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
435 // skip this element entirely
436 GST_DEBUG (GST_CAT_SCHEDULING,"skipping '%s' because it's decoupled\n",GST_ELEMENT_NAME(element));
437 elements = g_list_next (elements);
441 GST_DEBUG (GST_CAT_SCHEDULING,"starting with element '%s'\n",GST_ELEMENT_NAME(element));
443 // prime the pending list with the first element off the top
444 pending = g_slist_prepend (NULL, element);
445 // and remove that one from the main list
446 elements = g_list_remove (elements, element);
448 // create a chain structure
449 chain = g_new0 (_GstBinChain, 1);
450 chain->need_scheduling = TRUE;
452 // for each pending element, walk the pipeline
454 // retrieve the top of the stack and pop it
455 element = GST_ELEMENT (pending->data);
456 pending = g_slist_remove (pending, element);
458 // add ourselves to the chain's list of elements
459 GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to chain\n",GST_ELEMENT_NAME(element));
460 chain->elements = g_list_prepend (chain->elements, element);
461 chain->num_elements++;
462 gtk_signal_connect (G_OBJECT (element), "eos", gst_scheduler_handle_eos, chain);
463 // set the cothreads flag as appropriate
464 if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
465 chain->need_cothreads = TRUE;
466 if (bin->use_cothreads == TRUE)
467 chain->need_cothreads = TRUE;
469 // if we're managed by the current bin, and we're not decoupled,
470 // go find all the peers and add them to the list of elements to check
471 if ((element->manager == GST_ELEMENT(bin)) &&
472 !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
473 // remove ourselves from the outer list of all managed elements
474 // GST_DEBUG (GST_CAT_SCHEDULING,"removing '%s' from list of possible elements\n",GST_ELEMENT_NAME(element));
475 elements = g_list_remove (elements, element);
477 // if this element is a source, add it as an entry
478 if (element->numsinkpads == 0) {
479 chain->entries = g_list_prepend (chain->entries, element);
480 GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as SRC entry into the chain\n",GST_ELEMENT_NAME(element));
483 // now we have to walk the pads to find peers
484 pads = gst_element_get_pad_list (element);
486 pad = GST_PAD (pads->data);
487 pads = g_list_next (pads);
488 if (!GST_IS_REAL_PAD(pad)) continue;
489 GST_DEBUG (GST_CAT_SCHEDULING,"have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
491 if (GST_RPAD_PEER(pad) == NULL) continue;
492 if (GST_RPAD_PEER(pad) == NULL) GST_ERROR(pad,"peer is null!");
493 g_assert(GST_RPAD_PEER(pad) != NULL);
494 g_assert(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))) != NULL);
496 peerparent = GST_ELEMENT(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))));
498 GST_DEBUG (GST_CAT_SCHEDULING,"peer pad %p\n", GST_RPAD_PEER(pad));
499 // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
500 // only add it if it's in the list of un-visited elements still
501 if ((g_list_find (elements, peerparent) != NULL) ||
502 GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED)) {
503 // add the peer element to the pending list
504 GST_DEBUG (GST_CAT_SCHEDULING,"adding '%s' to list of pending elements\n",
505 GST_ELEMENT_NAME(peerparent));
506 pending = g_slist_prepend (pending, peerparent);
508 // if this is a sink pad, then the element on the other side is an entry
509 if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
510 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
511 chain->entries = g_list_prepend (chain->entries, peerparent);
512 gtk_signal_connect (G_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
513 GST_DEBUG (GST_CAT_SCHEDULING,"added '%s' as DECOUPLED entry into the chain\n",GST_ELEMENT_NAME(peerparent));
516 GST_DEBUG (GST_CAT_SCHEDULING,"element '%s' has already been dealt with\n",GST_ELEMENT_NAME(peerparent));
521 // add the chain to the bin
522 GST_DEBUG (GST_CAT_SCHEDULING,"have chain with %d elements: ",chain->num_elements);
523 { GList *elements = chain->elements;
525 element = GST_ELEMENT (elements->data);
526 elements = g_list_next(elements);
527 GST_DEBUG_NOPREFIX(GST_CAT_SCHEDULING,"%s, ",GST_ELEMENT_NAME(element));
530 GST_DEBUG_NOPREFIX(GST_CAT_DATAFLOW,"\n");
531 bin->chains = g_list_prepend (bin->chains, chain);
534 // free up the list in case it's full of DECOUPLED elements
535 g_list_free (elements);
537 GST_DEBUG (GST_CAT_SCHEDULING,"\nwe have %d chains to schedule\n",bin->num_chains);
539 // now we have to go through all the chains and schedule them
540 chains = bin->chains;
542 chain = (GstScheduleChain *)(chains->data);
543 chains = g_list_next (chains);
545 // schedule as appropriate
546 if (chain->need_cothreads) {
547 gst_schedule_cothreaded_chain (bin,chain);
549 gst_schedule_chained_chain (bin,chain);
553 GST_DEBUG_LEAVE("(\"%s\")",GST_ELEMENT_NAME(GST_ELEMENT(bin)));
559 // ***** check for possible connections outside
560 // get the pad's peer
561 peer = gst_pad_get_peer (pad);
562 // FIXME this should be an error condition, if not disabled
564 // get the parent of the peer of the pad
565 outside = GST_ELEMENT (gst_pad_get_parent (peer));
566 // FIXME this should *really* be an error condition
568 // if it's a source or connection and it's not ours...
569 if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
570 (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
571 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
572 GST_DEBUG (0,"dealing with outside source element %s\n",GST_ELEMENT_NAME(outside));
573 // GST_DEBUG (0,"PUNT: copying gethandler ptr from %s:%s to %s:%s (@ %p)\n",
574 //GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->gethandler);
575 // pad->gethandler = pad->peer->gethandler;
576 // GST_DEBUG (0,"PUNT: setting chainhandler proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
577 // pad->peer->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_fake_proxy);
578 GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
579 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
589 } else if (GST_IS_SRC (element)) {
590 GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
591 bin->entries = g_list_prepend (bin->entries,element);
593 cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
596 pads = gst_element_get_pad_list (element);
598 pad = GST_PAD(pads->data);
600 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
601 GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
602 // set the proxy functions
603 pad->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_proxy);
604 GST_DEBUG (0,"chainhandler %p = gst_bin_chainhandler_proxy %p\n",&pad->chainhandler,gst_bin_chainhandler_proxy);
605 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
606 GST_DEBUG (0,"setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
607 // set the proxy functions
608 GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
609 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
610 GST_DEBUG (0,"pad->gethandler(@%p) = gst_bin_gethandler_proxy(@%p)\n",
611 &pad->gethandler,gst_bin_gethandler_proxy);
612 pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
614 pads = g_list_next (pads);
616 elements = g_list_next (elements);
618 // if there are no entries, we have to pick one at random
619 if (bin->num_entries == 0)
620 bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
623 GST_DEBUG (0,"don't need cothreads, looking for entry points\n");
624 // we have to find which elements will drive an iteration
625 elements = bin->children;
627 element = GST_ELEMENT (elements->data);
628 GST_DEBUG (0,"found element \"%s\"\n", GST_ELEMENT_NAME (element));
629 if (GST_IS_BIN (element)) {
630 gst_bin_create_plan (GST_BIN (element));
632 if (GST_IS_SRC (element)) {
633 GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
634 bin->entries = g_list_prepend (bin->entries, element);
638 // go through all the pads, set pointers, and check for connections
639 pads = gst_element_get_pad_list (element);
641 pad = GST_PAD (pads->data);
643 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
644 GST_DEBUG (0,"found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
646 // copy the peer's chain function, easy enough
647 GST_DEBUG (0,"copying peer's chainfunc to %s:%s's chainhandler\n",GST_DEBUG_PAD_NAME(pad));
648 GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)));
650 // need to walk through and check for outside connections
651 //FIXME need to do this for all pads
652 // get the pad's peer
653 peer = GST_RPAD_PEER(pad);
655 GST_DEBUG (0,"found SINK pad %s has no peer\n", GST_ELEMENT_NAME (pad));
658 // get the parent of the peer of the pad
659 outside = GST_ELEMENT (GST_PAD_PARENT(peer));
661 // if it's a connection and it's not ours...
662 if (GST_IS_CONNECTION (outside) &&
663 (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
664 gst_info("gstbin: element \"%s\" is the external source Connection "
665 "for internal element \"%s\"\n",
666 GST_ELEMENT_NAME (GST_ELEMENT (outside)),
667 GST_ELEMENT_NAME (GST_ELEMENT (element)));
668 bin->entries = g_list_prepend (bin->entries, outside);
673 GST_DEBUG (0,"found pad %s\n", GST_ELEMENT_NAME (pad));
675 pads = g_list_next (pads);
678 elements = g_list_next (elements);
686 // If cothreads are needed, we need to not only find elements but
687 // set up cothread states and various proxy functions.
688 if (bin->need_cothreads) {
689 GST_DEBUG (0,"bin is using cothreads\n");
691 // first create thread context
692 if (bin->threadcontext == NULL) {
693 GST_DEBUG (0,"initializing cothread context\n");
694 bin->threadcontext = cothread_init ();
697 // walk through all the children
698 elements = bin->managed_elements;
700 element = GST_ELEMENT (elements->data);
701 elements = g_list_next (elements);
703 // start out with a NULL warpper function, we'll set it if we want a cothread
704 wrapper_function = NULL;
706 // have to decide if we need to or can use a cothreads, and if so which wrapper
707 // first of all, if there's a loopfunc, the decision's already made
708 if (element->loopfunc != NULL) {
709 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
710 GST_DEBUG (0,"element %s is a loopfunc, must use a cothread\n",GST_ELEMENT_NAME (element));
712 // otherwise we need to decide if it needs a cothread
713 // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
714 if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
715 (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
716 !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
717 // base it on whether we're going to loop through source or sink pads
718 if (element->numsinkpads == 0)
719 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
721 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
725 // walk through the all the pads for this element, setting proxy functions
726 // the selection of proxy functions depends on whether we're in a cothread or not
727 pads = gst_element_get_pad_list (element);
729 pad = GST_PAD (pads->data);
730 pads = g_list_next (pads);
732 // check to see if someone else gets to set up the element
733 peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
734 if (peer_manager != GST_ELEMENT(bin)) {
735 GST_DEBUG (0,"WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
738 // if the wrapper_function is set, we need to use the proxy functions
739 if (wrapper_function != NULL) {
740 // set up proxy functions
741 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
742 GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
743 pad->chainhandler = GST_DEBUG_FUNCPTR(gst_bin_chainhandler_proxy);
744 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
745 GST_DEBUG (0,"setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
746 GST_RPAD_GETHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_bin_gethandler_proxy);
747 GST_RPAD_PULLREGIONFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
750 // otherwise we need to set up for 'traditional' chaining
751 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
752 // we can just copy the chain function, since it shares the prototype
753 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",
754 GST_DEBUG_PAD_NAME(pad));
755 pad->chainhandler = pad->chainfunc;
756 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
757 // we can just copy the get function, since it shares the prototype
758 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",
759 GST_DEBUG_PAD_NAME(pad));
760 pad->gethandler = pad->getfunc;
765 // if a loopfunc has been specified, create and set up a cothread
766 if (wrapper_function != NULL) {
767 if (element->threadstate == NULL) {
768 element->threadstate = cothread_create (bin->threadcontext);
769 GST_DEBUG (0,"created cothread %p (@%p) for \"%s\"\n",element->threadstate,
770 &element->threadstate,GST_ELEMENT_NAME (element));
772 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
773 GST_DEBUG (0,"set wrapper function for \"%s\" to &%s\n",GST_ELEMENT_NAME (element),
774 GST_DEBUG_FUNCPTR_NAME(wrapper_function));
777 // // HACK: if the element isn't decoupled, it's an entry
778 // if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
779 // bin->entries = g_list_append(bin->entries, element);
782 // otherwise, cothreads are not needed
784 GST_DEBUG (0,"bin is chained, no cothreads needed\n");
786 elements = bin->managed_elements;
788 element = GST_ELEMENT (elements->data);
789 elements = g_list_next (elements);
791 pads = gst_element_get_pad_list (element);
793 pad = GST_PAD (pads->data);
794 pads = g_list_next (pads);
796 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
797 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
798 pad->chainhandler = pad->chainfunc;
800 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
801 pad->gethandler = pad->getfunc;
809 gst_schedule_lock_element (GstSchedule *sched,GstElement *element)
811 if (element->threadstate)
812 cothread_lock(element->threadstate);
816 gst_schedule_unlock_element (GstSchedule *sched,GstElement *element)
818 if (element->threadstate)
819 cothread_unlock(element->threadstate);
823 /*************** INCREMENTAL SCHEDULING CODE STARTS HERE ***************/
826 static void gst_schedule_class_init (GstScheduleClass *klass);
827 static void gst_schedule_init (GstSchedule *schedule);
829 static GstObjectClass *parent_class = NULL;
831 GType gst_schedule_get_type(void) {
832 static GType schedule_type = 0;
834 if (!schedule_type) {
835 static const GTypeInfo schedule_info = {
836 sizeof(GstScheduleClass),
839 (GClassInitFunc)gst_schedule_class_init,
844 (GInstanceInitFunc)gst_schedule_init,
846 schedule_type = g_type_register_static(GST_TYPE_OBJECT, "GstSchedule", &schedule_info, 0);
848 return schedule_type;
852 gst_schedule_class_init (GstScheduleClass *klass)
854 parent_class = g_type_class_ref(GST_TYPE_OBJECT);
858 gst_schedule_init (GstSchedule *schedule)
860 schedule->add_element = GST_DEBUG_FUNCPTR(gst_schedule_add_element);
861 schedule->remove_element = GST_DEBUG_FUNCPTR(gst_schedule_remove_element);
862 schedule->enable_element = GST_DEBUG_FUNCPTR(gst_schedule_enable_element);
863 schedule->disable_element = GST_DEBUG_FUNCPTR(gst_schedule_disable_element);
864 schedule->lock_element = GST_DEBUG_FUNCPTR(gst_schedule_lock_element);
865 schedule->unlock_element = GST_DEBUG_FUNCPTR(gst_schedule_unlock_element);
866 schedule->pad_connect = GST_DEBUG_FUNCPTR(gst_schedule_pad_connect);
867 schedule->pad_disconnect = GST_DEBUG_FUNCPTR(gst_schedule_pad_disconnect);
868 schedule->pad_select = GST_DEBUG_FUNCPTR(gst_schedule_pad_select);
869 schedule->iterate = GST_DEBUG_FUNCPTR(gst_schedule_iterate);
873 gst_schedule_new(GstElement *parent)
875 GstSchedule *sched = GST_SCHEDULE (g_object_new(GST_TYPE_SCHEDULE,NULL));
877 sched->parent = parent;
883 /* this function will look at a pad and determine if the peer parent is
884 * a possible candidate for connecting up in the same chain. */
886 GstElement *gst_schedule_check_pad (GstSchedule *sched, GstPad *pad) {
888 GstElement *element, *peerelement;
890 GST_INFO (GST_CAT_SCHEDULING, "checking pad %s:%s for peer in scheduler",
891 GST_DEBUG_PAD_NAME(pad));
893 element = GST_ELEMENT(GST_PAD_PARENT(peer));
894 GST_DEBUG(GST_CAT_SCHEDULING, "element is \"%s\"\n",GST_ELEMENT_NAME(element));
896 peer = GST_PAD_PEER (pad);
897 if (peer == NULL) return NULL;
898 peerelement = GST_ELEMENT(GST_PAD_PARENT (peer));
899 if (peerelement == NULL) return NULL;
900 GST_DEBUG(GST_CAT_SCHEDULING, "peer element is \"%s\"\n",GST_ELEMENT_NAME(peerelement));
902 // now check to see if it's in the same schedule
903 if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
904 GST_DEBUG(GST_CAT_SCHEDULING, "peer is in same schedule\n");
908 // otherwise it's not a candidate
914 gst_schedule_chain_new (GstSchedule *sched)
916 GstScheduleChain *chain = g_new (GstScheduleChain, 1);
918 // initialize the chain with sane values
919 chain->sched = sched;
920 chain->disabled = NULL;
921 chain->elements = NULL;
922 chain->num_elements = 0;
924 chain->cothreaded_elements = 0;
925 chain->schedule = FALSE;
927 // add the chain to the schedules' list of chains
928 sched->chains = g_list_prepend (sched->chains, chain);
931 GST_INFO (GST_CAT_SCHEDULING, "created new chain %p, now are %d chains in sched %p",
932 chain,sched->num_chains,sched);
938 gst_schedule_chain_destroy (GstScheduleChain *chain)
940 GstSchedule *sched = chain->sched;
942 // remove the chain from the schedules' list of chains
943 chain->sched->chains = g_list_remove (chain->sched->chains, chain);
944 chain->sched->num_chains--;
947 g_list_free (chain->disabled); // should be empty...
948 g_list_free (chain->elements); // ditto
951 GST_INFO (GST_CAT_SCHEDULING, "destroyed chain %p, now are %d chains in sched %p",chain,sched->num_chains,sched);
955 gst_schedule_chain_add_element (GstScheduleChain *chain, GstElement *element)
957 GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to chain %p", GST_ELEMENT_NAME (element),chain);
959 // set the sched pointer for the element
960 element->sched = chain->sched;
962 // add the element to the list of 'disabled' elements
963 chain->disabled = g_list_prepend (chain->disabled, element);
964 chain->num_elements++;
968 gst_schedule_chain_enable_element (GstScheduleChain *chain, GstElement *element)
970 GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
972 // remove from disabled list
973 chain->disabled = g_list_remove (chain->disabled, element);
975 // add to elements list
976 chain->elements = g_list_prepend (chain->elements, element);
978 // reschedule the chain
979 gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
983 gst_schedule_chain_disable_element (GstScheduleChain *chain, GstElement *element)
985 GST_INFO (GST_CAT_SCHEDULING, "disabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),chain);
987 // remove from elements list
988 chain->elements = g_list_remove (chain->elements, element);
990 // add to disabled list
991 chain->disabled = g_list_prepend (chain->disabled, element);
993 // reschedule the chain
994 // FIXME this should be done only if manager state != NULL
995 // gst_schedule_cothreaded_chain(GST_BIN(chain->sched->parent),chain);
999 gst_schedule_chain_remove_element (GstScheduleChain *chain, GstElement *element)
1001 GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),chain);
1003 // if it's active, deactivate it
1004 if (g_list_find (chain->elements, element)) {
1005 gst_schedule_chain_disable_element (chain, element);
1008 // remove the element from the list of elements
1009 chain->disabled = g_list_remove (chain->disabled, element);
1010 chain->num_elements--;
1012 // if there are no more elements in the chain, destroy the chain
1013 if (chain->num_elements == 0)
1014 gst_schedule_chain_destroy(chain);
1016 // unset the sched pointer for the element
1017 element->sched = NULL;
1021 gst_schedule_chain_elements (GstSchedule *sched, GstElement *element1, GstElement *element2)
1024 GstScheduleChain *chain;
1025 GstScheduleChain *chain1 = NULL, *chain2 = NULL;
1026 GstElement *element;
1028 // first find the chains that hold the two
1029 chains = sched->chains;
1031 chain = (GstScheduleChain *)(chains->data);
1032 chains = g_list_next(chains);
1034 if (g_list_find (chain->disabled,element1))
1036 else if (g_list_find (chain->elements,element1))
1039 if (g_list_find (chain->disabled,element2))
1041 else if (g_list_find (chain->elements,element2))
1045 // first check to see if they're in the same chain, we're done if that's the case
1046 if ((chain1 != NULL) && (chain1 == chain2)) {
1047 GST_INFO (GST_CAT_SCHEDULING, "elements are already in the same chain");
1051 // now, if neither element has a chain, create one
1052 if ((chain1 == NULL) && (chain2 == NULL)) {
1053 GST_INFO (GST_CAT_SCHEDULING, "creating new chain to hold two new elements");
1054 chain = gst_schedule_chain_new (sched);
1055 gst_schedule_chain_add_element (chain, element1);
1056 gst_schedule_chain_add_element (chain, element2);
1057 // FIXME chain changed here
1058 // gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1060 // otherwise if both have chains already, join them
1061 } else if ((chain1 != NULL) && (chain2 != NULL)) {
1062 GST_INFO (GST_CAT_SCHEDULING, "merging chain %p into chain %p",chain2,chain1);
1063 // take the contents of chain2 and merge them into chain1
1064 chain1->disabled = g_list_concat (chain1->disabled, g_list_copy(chain2->disabled));
1065 chain1->elements = g_list_concat (chain1->elements, g_list_copy(chain2->elements));
1066 chain1->num_elements += chain2->num_elements;
1067 // FIXME chain changed here
1068 // gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1070 gst_schedule_chain_destroy(chain2);
1072 // otherwise one has a chain already, the other doesn't
1074 // pick out which one has the chain, and which doesn't
1075 if (chain1 != NULL) chain = chain1, element = element2;
1076 else chain = chain2, element = element1;
1078 GST_INFO (GST_CAT_SCHEDULING, "adding element to existing chain");
1079 gst_schedule_chain_add_element (chain, element);
1080 // FIXME chain changed here
1081 // gst_schedule_cothreaded_chain(chain->sched->parent,chain);
1086 gst_schedule_pad_connect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1088 GstElement *srcelement,*sinkelement;
1090 srcelement = GST_PAD_PARENT(srcpad);
1091 g_return_if_fail(srcelement != NULL);
1092 sinkelement = GST_PAD_PARENT(sinkpad);
1093 g_return_if_fail(sinkelement != NULL);
1095 GST_INFO (GST_CAT_SCHEDULING, "have pad connected callback on %s:%s to %s:%s",GST_DEBUG_PAD_NAME(srcpad),GST_DEBUG_PAD_NAME(sinkpad));
1096 GST_DEBUG(GST_CAT_SCHEDULING, "srcpad sched is %p, sinkpad sched is %p\n",
1097 GST_ELEMENT_SCHED(srcelement),GST_ELEMENT_SCHED(sinkelement));
1099 if (GST_ELEMENT_SCHED(srcelement) == GST_ELEMENT_SCHED(sinkelement)) {
1100 GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same schedule, chaining together",GST_DEBUG_PAD_NAME(sinkpad));
1101 gst_schedule_chain_elements (sched, srcelement, sinkelement);
1105 // find the chain within the schedule that holds the element, if any
1107 gst_schedule_find_chain (GstSchedule *sched, GstElement *element)
1110 GstScheduleChain *chain;
1112 GST_INFO (GST_CAT_SCHEDULING, "searching for element \"%s\" in chains",GST_ELEMENT_NAME(element));
1114 chains = sched->chains;
1116 chain = (GstScheduleChain *)(chains->data);
1117 chains = g_list_next (chains);
1119 if (g_list_find (chain->elements, element))
1121 if (g_list_find (chain->disabled, element))
1129 gst_schedule_chain_recursive_add (GstScheduleChain *chain, GstElement *element)
1133 GstElement *peerelement;
1135 // add the element to the chain
1136 gst_schedule_chain_add_element (chain, element);
1138 GST_DEBUG(GST_CAT_SCHEDULING, "recursing on element \"%s\"\n",GST_ELEMENT_NAME(element));
1139 // now go through all the pads and see which peers can be added
1140 pads = element->pads;
1142 pad = GST_PAD(pads->data);
1143 pads = g_list_next (pads);
1145 GST_DEBUG(GST_CAT_SCHEDULING, "have pad %s:%s, checking for valid peer\n",GST_DEBUG_PAD_NAME(pad));
1146 // if the peer exists and could be in the same chain
1147 if (GST_PAD_PEER(pad)) {
1148 GST_DEBUG(GST_CAT_SCHEDULING, "has peer %s:%s\n",GST_DEBUG_PAD_NAME(GST_PAD_PEER(pad)));
1149 peerelement = GST_PAD_PARENT(GST_PAD_PEER(pad));
1150 if (GST_ELEMENT_SCHED(GST_PAD_PARENT(pad)) == GST_ELEMENT_SCHED(peerelement)) {
1151 GST_DEBUG(GST_CAT_SCHEDULING, "peer \"%s\" is valid for same chain\n",GST_ELEMENT_NAME(peerelement));
1152 // if it's not already in a chain, add it to this one
1153 if (gst_schedule_find_chain (chain->sched, peerelement) == NULL) {
1154 gst_schedule_chain_recursive_add (chain, peerelement);
1162 gst_schedule_pad_disconnect (GstSchedule *sched, GstPad *srcpad, GstPad *sinkpad)
1164 GstScheduleChain *chain;
1165 GstElement *element1, *element2;
1166 GstScheduleChain *chain1, *chain2;
1168 GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
1169 GST_DEBUG_PAD_NAME(srcpad), GST_DEBUG_PAD_NAME(sinkpad));
1171 // we need to have the parent elements of each pad
1172 element1 = GST_ELEMENT(GST_PAD_PARENT(srcpad));
1173 element2 = GST_ELEMENT(GST_PAD_PARENT(sinkpad));
1175 // first task is to remove the old chain they belonged to.
1176 // this can be accomplished by taking either of the elements,
1177 // since they are guaranteed to be in the same chain
1178 // FIXME is it potentially better to make an attempt at splitting cleaner??
1179 chain = gst_schedule_find_chain (sched, element1);
1181 GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1182 gst_schedule_chain_destroy (chain);
1185 // now create a new chain to hold element1 and build it from scratch
1186 chain1 = gst_schedule_chain_new (sched);
1187 gst_schedule_chain_recursive_add (chain1, element1);
1189 // check the other element to see if it landed in the newly created chain
1190 if (gst_schedule_find_chain (sched, element2) == NULL) {
1191 // if not in chain, create chain and build from scratch
1192 chain2 = gst_schedule_chain_new (sched);
1193 gst_schedule_chain_recursive_add (chain2, element2);
1198 gst_schedule_pad_select (GstSchedule *sched, GList *padlist)
1201 GList *padlist2 = padlist;
1202 GST_INFO (GST_CAT_SCHEDULING, "performing select");
1205 pad = GST_PAD (padlist2->data);
1207 if (gst_pad_peek (pad)) {
1208 g_print ("found something in pad %s:%s\n", GST_DEBUG_PAD_NAME (pad));
1212 padlist2 = g_list_next (padlist2);
1215 /* else there is nothing ready to consume, set up the select functions */
1217 pad = GST_PAD (padlist->data);
1219 GST_RPAD_CHAINHANDLER(pad) = GST_DEBUG_FUNCPTR(gst_schedule_select_proxy);
1221 padlist = g_list_next (padlist);
1224 GstRealPad *peer = GST_RPAD_PEER(pad);
1226 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
1228 g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
1229 pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
1231 g_assert (pad != NULL);
1232 g_print ("back from select (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
1238 gst_schedule_add_element (GstSchedule *sched, GstElement *element)
1242 GstElement *peerelement;
1243 GstScheduleChain *chain;
1245 g_return_if_fail (element != NULL);
1246 g_return_if_fail (GST_IS_ELEMENT(element));
1248 // if it's already in this schedule, don't bother doing anything
1249 if (GST_ELEMENT_SCHED(element) == sched) return;
1251 GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to schedule",
1252 GST_ELEMENT_NAME(element));
1254 // if the element already has a different scheduler, remove the element from it
1255 if (GST_ELEMENT_SCHED(element)) {
1256 gst_schedule_remove_element(GST_ELEMENT_SCHED(element),element);
1259 // set the sched pointer in the element itself
1260 GST_ELEMENT_SCHED(element) = sched;
1262 // only deal with elements after this point, not bins
1263 // exception is made for Bin's that are schedulable, like the autoplugger
1264 if (GST_IS_BIN (element) && !GST_FLAG_IS_SET(element, GST_BIN_SELF_SCHEDULABLE)) return;
1266 // first add it to the list of elements that are to be scheduled
1267 sched->elements = g_list_prepend (sched->elements, element);
1268 sched->num_elements++;
1270 // create a chain to hold it, and add
1271 chain = gst_schedule_chain_new (sched);
1272 gst_schedule_chain_add_element (chain, element);
1274 // set the sched pointer in all the pads
1275 pads = element->pads;
1277 pad = GST_PAD(pads->data);
1278 pads = g_list_next(pads);
1280 // we only operate on real pads
1281 if (!GST_IS_REAL_PAD(pad)) continue;
1283 // set the pad's sched pointer
1284 gst_pad_set_sched (pad, sched);
1286 // if the peer element exists and is a candidate
1287 if (GST_PAD_PEER(pad)) {
1288 peerelement = GST_PAD_PARENT( GST_PAD_PEER (pad) );
1289 if (GST_ELEMENT_SCHED(element) == GST_ELEMENT_SCHED(peerelement)) {
1290 GST_INFO (GST_CAT_SCHEDULING, "peer is in same schedule, chaining together");
1291 // make sure that the two elements are in the same chain
1292 gst_schedule_chain_elements (sched,element,peerelement);
1299 gst_schedule_enable_element (GstSchedule *sched, GstElement *element)
1301 GstScheduleChain *chain;
1303 // find the chain the element's in
1304 chain = gst_schedule_find_chain (sched, element);
1307 gst_schedule_chain_enable_element (chain, element);
1309 GST_INFO (GST_CAT_SCHEDULING, "element not found in any chain, not enabling");
1313 gst_schedule_disable_element (GstSchedule *sched, GstElement *element)
1315 GstScheduleChain *chain;
1317 // find the chain the element is in
1318 chain = gst_schedule_find_chain (sched, element);
1320 // remove it from the chain
1322 gst_schedule_chain_disable_element(chain,element);
1327 gst_schedule_remove_element (GstSchedule *sched, GstElement *element)
1329 GstScheduleChain *chain;
1331 g_return_if_fail (element != NULL);
1332 g_return_if_fail (GST_IS_ELEMENT(element));
1334 if (g_list_find (sched->elements, element)) {
1335 GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from schedule",
1336 GST_ELEMENT_NAME(element));
1338 // find what chain the element is in
1339 chain = gst_schedule_find_chain(sched, element);
1341 // remove it from its chain
1342 gst_schedule_chain_remove_element (chain, element);
1344 // remove it from the list of elements
1345 sched->elements = g_list_remove (sched->elements, element);
1346 sched->num_elements--;
1348 // unset the scheduler pointer in the element
1349 GST_ELEMENT_SCHED(element) = NULL;
1354 gst_schedule_iterate (GstSchedule *sched)
1356 GstBin *bin = GST_BIN(sched->parent);
1358 GstScheduleChain *chain;
1360 gint num_scheduled = 0;
1361 gboolean eos = FALSE;
1364 GST_DEBUG_ENTER("(\"%s\")", GST_ELEMENT_NAME (bin));
1366 g_return_val_if_fail (bin != NULL, TRUE);
1367 g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
1368 // g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
1370 // step through all the chains
1371 chains = sched->chains;
1372 // if (chains == NULL) return FALSE;
1373 g_return_val_if_fail (chains != NULL, FALSE);
1375 chain = (GstScheduleChain *)(chains->data);
1376 chains = g_list_next (chains);
1378 // if (!chain->need_scheduling) continue;
1380 // if (chain->need_cothreads) {
1381 // all we really have to do is switch to the first child
1382 // FIXME this should be lots more intelligent about where to start
1383 GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via cothreads\n");
1385 if (chain->elements) {
1386 entry = NULL; //MattH ADDED?
1387 GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_elements);
1388 elements = chain->elements;
1390 entry = GST_ELEMENT(elements->data);
1391 elements = g_list_next(elements);
1392 if (GST_FLAG_IS_SET(entry,GST_ELEMENT_DECOUPLED)) {
1393 GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is DECOUPLED, skipping\n",GST_ELEMENT_NAME(entry));
1395 } else if (GST_FLAG_IS_SET(entry,GST_ELEMENT_NO_ENTRY)) {
1396 GST_DEBUG(GST_CAT_SCHEDULING,"entry \"%s\" is not valid, skipping\n",GST_ELEMENT_NAME(entry));
1402 GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1403 GST_DEBUG (GST_CAT_DATAFLOW,"set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
1404 GST_ELEMENT_NAME (entry),entry);
1405 cothread_switch (entry->threadstate);
1407 // following is a check to see if the chain was interrupted due to a
1408 // top-half state_change(). (i.e., if there's a pending state.)
1410 // if it was, return to gstthread.c::gst_thread_main_loop() to
1411 // execute the state change.
1412 GST_DEBUG (GST_CAT_DATAFLOW,"cothread switch ended or interrupted\n");
1413 if (GST_STATE_PENDING(GST_SCHEDULE(sched)->parent) != GST_STATE_VOID_PENDING)
1415 GST_DEBUG (GST_CAT_DATAFLOW,"handle pending state %d\n",
1416 GST_STATE_PENDING(GST_SCHEDULE(sched)->parent));
1421 GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
1425 GST_INFO (GST_CAT_DATAFLOW,"NO ENABLED ELEMENTS IN CHAIN!!");
1431 GST_DEBUG (GST_CAT_DATAFLOW,"starting iteration via chain-functions\n");
1433 entries = chain->entries;
1435 g_assert (entries != NULL);
1438 entry = GST_ELEMENT (entries->data);
1439 entries = g_list_next (entries);
1441 GST_DEBUG (GST_CAT_DATAFLOW,"have entry \"%s\"\n",GST_ELEMENT_NAME (entry));
1443 if (GST_IS_BIN (entry)) {
1444 gst_bin_iterate (GST_BIN (entry));
1448 pad = GST_PAD (pads->data);
1449 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC) {
1450 GST_DEBUG (GST_CAT_DATAFLOW,"calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
1451 if (GST_REAL_PAD(pad)->getfunc == NULL)
1452 fprintf(stderr, "error, no getfunc in \"%s\"\n", GST_ELEMENT_NAME (entry));
1454 buf = (GST_REAL_PAD(pad)->getfunc)(pad);
1455 if (buf) gst_pad_push(pad,buf);
1457 pads = g_list_next (pads);
1466 // check if nothing was scheduled that was ours..
1467 if (!num_scheduled) {
1468 // are there any other elements that are still busy?
1469 if (bin->num_eos_providers) {
1471 GST_DEBUG (GST_CATA_DATAFLOW,"waiting for eos providers\n");
1472 g_cond_wait (bin->eoscond, GST_OBJECT(bin)->lock);
1473 GST_DEBUG (GST_CAT_DATAFLOW,"num eos providers %d\n", bin->num_eos_providers);
1477 gst_element_signal_eos (GST_ELEMENT (bin));
1483 GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
1490 gst_schedule_show (GstSchedule *sched)
1492 GList *chains, *elements;
1493 GstElement *element;
1494 GstScheduleChain *chain;
1496 if (sched == NULL) {
1497 g_print("schedule doesn't exist for this element\n");
1501 g_return_if_fail(GST_IS_SCHEDULE(sched));
1503 g_print("SCHEDULE DUMP FOR MANAGING BIN \"%s\"\n",GST_ELEMENT_NAME(sched->parent));
1505 g_print("schedule has %d elements in it: ",sched->num_elements);
1506 elements = sched->elements;
1508 element = GST_ELEMENT(elements->data);
1509 elements = g_list_next(elements);
1511 g_print("%s, ",GST_ELEMENT_NAME(element));
1515 g_print("schedule has %d chains in it\n",sched->num_chains);
1516 chains = sched->chains;
1518 chain = (GstScheduleChain *)(chains->data);
1519 chains = g_list_next(chains);
1521 g_print("%p: ",chain);
1523 elements = chain->disabled;
1525 element = GST_ELEMENT(elements->data);
1526 elements = g_list_next(elements);
1528 g_print("!%s, ",GST_ELEMENT_NAME(element));
1531 elements = chain->elements;
1533 element = GST_ELEMENT(elements->data);
1534 elements = g_list_next(elements);
1536 g_print("%s, ",GST_ELEMENT_NAME(element));