2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstscheduler.c: Default scheduling code for most cases
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
23 //#define GST_DEBUG_ENABLED
24 #include "gst_private.h"
26 #include "gstscheduler.h"
30 gst_bin_loopfunc_wrapper (int argc,char *argv[])
32 GstElement *element = GST_ELEMENT (argv);
33 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
35 GST_DEBUG_ENTER("(%d,'%s')",argc,name);
38 GST_DEBUG (0,"calling loopfunc %s for element %s\n",
39 GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
40 (element->loopfunc) (element);
41 GST_DEBUG (0,"element %s ended loop function\n", name);
42 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
43 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
45 GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
50 gst_bin_chain_wrapper (int argc,char *argv[])
52 GstElement *element = GST_ELEMENT (argv);
53 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
59 GST_DEBUG_ENTER("(\"%s\")",name);
60 GST_DEBUG (0,"stepping through pads\n");
64 pad = GST_PAD (pads->data);
65 pads = g_list_next (pads);
66 if (!GST_IS_REAL_PAD(pad)) continue;
67 realpad = GST_REAL_PAD(pad);
68 if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SINK) {
69 GST_DEBUG (0,"pulling a buffer from %s:%s\n", name, GST_PAD_NAME (pad));
70 buf = gst_pad_pull (pad);
71 GST_DEBUG (0,"calling chain function of %s:%s\n", name, GST_PAD_NAME (pad));
72 if (buf) GST_RPAD_CHAINFUNC(realpad) (pad,buf);
73 GST_DEBUG (0,"calling chain function of %s:%s done\n", name, GST_PAD_NAME (pad));
76 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
77 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
79 GST_DEBUG_LEAVE("(%d,'%s')",argc,name);
84 gst_bin_src_wrapper (int argc,char *argv[])
86 GstElement *element = GST_ELEMENT (argv);
90 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
92 GST_DEBUG_ENTER("(%d,\"%s\")",argc,name);
97 if (!GST_IS_REAL_PAD(pads->data)) continue;
98 realpad = (GstRealPad*)(pads->data);
99 pads = g_list_next(pads);
100 if (GST_RPAD_DIRECTION(realpad) == GST_PAD_SRC) {
101 // region_struct *region = cothread_get_data (element->threadstate, "region");
102 GST_DEBUG (0,"calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
104 // gst_src_push_region (GST_SRC (element), region->offset, region->size);
105 // if (GST_RPAD_GETREGIONFUNC(realpad) == NULL)
106 // fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
107 // buf = (GST_RPAD_GETREGIONFUNC(realpad))((GstPad*)realpad, region->offset, region->size);
109 if (GST_RPAD_GETFUNC(realpad) == NULL)
110 fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
111 buf = GST_RPAD_GETFUNC(realpad) ((GstPad*)realpad);
114 GST_DEBUG (0,"calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(realpad));
115 if (buf) gst_pad_push ((GstPad*)realpad, buf);
118 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
119 GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
126 gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf)
128 cothread_state *threadstate = GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate;
129 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
130 GST_DEBUG (0,"putting buffer %p in peer's pen\n",buf);
131 GST_RPAD_BUFPEN(GST_RPAD_PEER(pad)) = buf;
132 GST_DEBUG (0,"switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate));
133 cothread_switch (threadstate);
134 GST_DEBUG (0,"done switching\n");
138 gst_bin_pullfunc_proxy (GstPad *pad)
142 cothread_state *threadstate = GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate;
143 GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
144 // FIXME this should be a while
145 if (GST_RPAD_BUFPEN(pad) == NULL) {
146 GST_DEBUG (0,"switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(GST_PAD_PARENT(pad))->threadstate));
147 cothread_switch (threadstate);
149 GST_DEBUG (0,"done switching\n");
150 buf = GST_RPAD_BUFPEN(pad);
151 GST_RPAD_BUFPEN(pad) = NULL;
156 gst_bin_chainfunc_proxy (GstPad *pad)
165 gst_bin_pullregionfunc_proxy (GstPad *pad,
169 // region_struct region;
170 cothread_state *threadstate;
172 GST_DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size);
174 // region.offset = offset;
175 // region.size = size;
177 // threadstate = GST_ELEMENT(pad->parent)->threadstate;
178 // cothread_set_data (threadstate, "region", ®ion);
179 cothread_switch (threadstate);
180 // cothread_set_data (threadstate, "region", NULL);
185 gst_schedule_cothreaded_chain (GstBin *bin, _GstBinChain *chain) {
188 cothread_func wrapper_function;
192 GST_DEBUG (0,"chain is using cothreads\n");
194 // first create thread context
195 if (bin->threadcontext == NULL) {
196 GST_DEBUG (0,"initializing cothread context\n");
197 bin->threadcontext = cothread_init ();
200 // walk through all the chain's elements
201 elements = chain->elements;
203 element = GST_ELEMENT (elements->data);
204 elements = g_list_next (elements);
206 // start out without a wrapper function, we select it later
207 wrapper_function = NULL;
209 // if the element has a loopfunc...
210 if (element->loopfunc != NULL) {
211 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
212 GST_DEBUG (0,"\nelement '%s' is a loop-based\n",GST_ELEMENT_NAME(element));
214 // otherwise we need to decide what kind of cothread
215 // if it's not DECOUPLED, we decide based on whether it's a source or not
216 if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
217 // if it doesn't have any sinks, it must be a source (duh)
218 if (element->numsinkpads == 0) {
219 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
220 GST_DEBUG (0,"\nelement '%s' is a source, using _src_wrapper\n",GST_ELEMENT_NAME(element));
222 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
223 GST_DEBUG (0,"\nelement '%s' is a filter, using _chain_wrapper\n",GST_ELEMENT_NAME(element));
228 // now we have to walk through the pads to set up their state
229 pads = gst_element_get_pad_list (element);
231 pad = GST_PAD (pads->data);
232 pads = g_list_next (pads);
233 if (!GST_IS_REAL_PAD(pad)) continue;
235 // if the element is DECOUPLED or outside the manager, we have to chain
236 if ((wrapper_function == NULL) ||
237 (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->manager != GST_ELEMENT(bin))) {
238 // set the chain proxies
239 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
240 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
241 GST_RPAD_PUSHFUNC(pad) = GST_RPAD_CHAINFUNC(pad);
243 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
244 GST_RPAD_PULLFUNC(pad) = GST_RPAD_GETFUNC(pad);
245 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
248 // otherwise we really are a cothread
250 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
251 GST_DEBUG (0,"setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
252 GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
254 GST_DEBUG (0,"setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
255 GST_RPAD_PULLFUNC(pad) = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
260 // need to set up the cothread now
261 if (wrapper_function != NULL) {
262 if (element->threadstate == NULL) {
263 element->threadstate = cothread_create (bin->threadcontext);
264 GST_DEBUG (0,"created cothread %p for '%s'\n",element->threadstate,GST_ELEMENT_NAME(element));
266 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
267 GST_DEBUG (0,"set wrapper function for '%s' to &%s\n",GST_ELEMENT_NAME(element),
268 GST_DEBUG_FUNCPTR_NAME(wrapper_function));
274 gst_schedule_chained_chain (GstBin *bin, _GstBinChain *chain) {
280 GST_DEBUG (0,"chain entered\n");
281 // walk through all the elements
282 elements = chain->elements;
284 element = GST_ELEMENT (elements->data);
285 elements = g_list_next (elements);
287 // walk through all the pads
288 pads = gst_element_get_pad_list (element);
290 pad = GST_PAD (pads->data);
291 pads = g_list_next (pads);
292 if (!GST_IS_REAL_PAD(pad)) continue;
294 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
295 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
296 GST_RPAD_PUSHFUNC(pad) = GST_RPAD_CHAINFUNC(pad);
298 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
299 GST_RPAD_PULLFUNC(pad) = GST_RPAD_GETFUNC(pad);
300 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
307 gst_bin_schedule_cleanup (GstBin *bin)
312 chains = bin->chains;
314 chain = (_GstBinChain *)(chains->data);
315 chains = g_list_next(chains);
317 g_list_free(chain->elements);
318 g_list_free(chain->entries);
322 g_list_free(bin->chains);
328 gst_scheduler_handle_eos (GstElement *element, _GstBinChain *chain)
330 GST_DEBUG (0,"chain removed from scheduler, EOS from element \"%s\"\n", GST_ELEMENT_NAME (element));
331 chain->need_scheduling = FALSE;
334 void gst_bin_schedule_func(GstBin *bin) {
337 GSList *pending = NULL;
340 GstElement *peerparent;
344 GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME (GST_ELEMENT (bin)));
346 gst_bin_schedule_cleanup(bin);
348 // next we have to find all the separate scheduling chains
349 GST_DEBUG (0,"\nattempting to find scheduling chains...\n");
350 // first make a copy of the managed_elements we can mess with
351 elements = g_list_copy (bin->managed_elements);
352 // we have to repeat until the list is empty to get all chains
354 element = GST_ELEMENT (elements->data);
356 // if this is a DECOUPLED element
357 if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
358 // skip this element entirely
359 GST_DEBUG (0,"skipping '%s' because it's decoupled\n",GST_ELEMENT_NAME(element));
360 elements = g_list_next (elements);
364 GST_DEBUG (0,"starting with element '%s'\n",GST_ELEMENT_NAME(element));
366 // prime the pending list with the first element off the top
367 pending = g_slist_prepend (NULL, element);
368 // and remove that one from the main list
369 elements = g_list_remove (elements, element);
371 // create a chain structure
372 chain = g_new0 (_GstBinChain, 1);
373 chain->need_scheduling = TRUE;
375 // for each pending element, walk the pipeline
377 // retrieve the top of the stack and pop it
378 element = GST_ELEMENT (pending->data);
379 pending = g_slist_remove (pending, element);
381 // add ourselves to the chain's list of elements
382 GST_DEBUG (0,"adding '%s' to chain\n",GST_ELEMENT_NAME(element));
383 chain->elements = g_list_prepend (chain->elements, element);
384 chain->num_elements++;
385 gtk_signal_connect (GTK_OBJECT (element), "eos", gst_scheduler_handle_eos, chain);
386 // set the cothreads flag as appropriate
387 if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
388 chain->need_cothreads = TRUE;
389 if (bin->use_cothreads == TRUE)
390 chain->need_cothreads = TRUE;
392 // if we're managed by the current bin, and we're not decoupled,
393 // go find all the peers and add them to the list of elements to check
394 if ((element->manager == GST_ELEMENT(bin)) &&
395 !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
396 // remove ourselves from the outer list of all managed elements
397 // GST_DEBUG (0,"removing '%s' from list of possible elements\n",GST_ELEMENT_NAME(element));
398 elements = g_list_remove (elements, element);
400 // if this element is a source, add it as an entry
401 if (element->numsinkpads == 0) {
402 chain->entries = g_list_prepend (chain->entries, element);
403 GST_DEBUG (0,"added '%s' as SRC entry into the chain\n",GST_ELEMENT_NAME(element));
406 // now we have to walk the pads to find peers
407 pads = gst_element_get_pad_list (element);
409 pad = GST_PAD (pads->data);
410 pads = g_list_next (pads);
411 if (!GST_IS_REAL_PAD(pad)) continue;
412 GST_DEBUG (0,"have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
414 if (GST_RPAD_PEER(pad) == NULL) GST_ERROR(pad,"peer is null!");
415 g_assert(GST_RPAD_PEER(pad) != NULL);
416 g_assert(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))) != NULL);
418 peerparent = GST_ELEMENT(GST_PAD_PARENT (GST_PAD(GST_RPAD_PEER(pad))));
420 GST_DEBUG (0,"peer pad %p\n", GST_RPAD_PEER(pad));
421 // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
422 // only add it if it's in the list of un-visited elements still
423 if ((g_list_find (elements, peerparent) != NULL) ||
424 GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED)) {
425 // add the peer element to the pending list
426 GST_DEBUG (0,"adding '%s' to list of pending elements\n",
427 GST_ELEMENT_NAME(peerparent));
428 pending = g_slist_prepend (pending, peerparent);
430 // if this is a sink pad, then the element on the other side is an entry
431 if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
432 (GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
433 chain->entries = g_list_prepend (chain->entries, peerparent);
434 gtk_signal_connect (GTK_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
435 GST_DEBUG (0,"added '%s' as DECOUPLED entry into the chain\n",GST_ELEMENT_NAME(peerparent));
438 GST_DEBUG (0,"element '%s' has already been dealt with\n",GST_ELEMENT_NAME(peerparent));
443 // add the chain to the bin
444 GST_DEBUG (0,"have chain with %d elements: ",chain->num_elements);
445 { GList *elements = chain->elements;
447 element = GST_ELEMENT (elements->data);
448 elements = g_list_next(elements);
449 GST_DEBUG_NOPREFIX(0,"%s, ",GST_ELEMENT_NAME(element));
452 GST_DEBUG_NOPREFIX(0,"\n");
453 bin->chains = g_list_prepend (bin->chains, chain);
456 // free up the list in case it's full of DECOUPLED elements
457 g_list_free (elements);
459 GST_DEBUG (0,"\nwe have %d chains to schedule\n",bin->num_chains);
461 // now we have to go through all the chains and schedule them
462 chains = bin->chains;
464 chain = (_GstBinChain *)(chains->data);
465 chains = g_list_next (chains);
467 // schedule as appropriate
468 if (chain->need_cothreads) {
469 gst_schedule_cothreaded_chain (bin,chain);
471 gst_schedule_chained_chain (bin,chain);
475 GST_DEBUG_LEAVE("(\"%s\")",GST_ELEMENT_NAME(GST_ELEMENT(bin)));
480 // ***** check for possible connections outside
481 // get the pad's peer
482 peer = gst_pad_get_peer (pad);
483 // FIXME this should be an error condition, if not disabled
485 // get the parent of the peer of the pad
486 outside = GST_ELEMENT (gst_pad_get_parent (peer));
487 // FIXME this should *really* be an error condition
489 // if it's a source or connection and it's not ours...
490 if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
491 (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
492 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
493 GST_DEBUG (0,"dealing with outside source element %s\n",GST_ELEMENT_NAME(outside));
494 // GST_DEBUG (0,"PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n",
495 //GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc);
496 // pad->pullfunc = pad->peer->pullfunc;
497 // GST_DEBUG (0,"PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
498 // pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy);
499 pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
509 } else if (GST_IS_SRC (element)) {
510 GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
511 bin->entries = g_list_prepend (bin->entries,element);
513 cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
516 pads = gst_element_get_pad_list (element);
518 pad = GST_PAD(pads->data);
520 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
521 GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
522 // set the proxy functions
523 pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
524 GST_DEBUG (0,"pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
525 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
526 GST_DEBUG (0,"setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
527 // set the proxy functions
528 pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
529 GST_DEBUG (0,"pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
530 &pad->pullfunc,gst_bin_pullfunc_proxy);
531 pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
533 pads = g_list_next (pads);
535 elements = g_list_next (elements);
537 // if there are no entries, we have to pick one at random
538 if (bin->num_entries == 0)
539 bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
542 GST_DEBUG (0,"don't need cothreads, looking for entry points\n");
543 // we have to find which elements will drive an iteration
544 elements = bin->children;
546 element = GST_ELEMENT (elements->data);
547 GST_DEBUG (0,"found element \"%s\"\n", GST_ELEMENT_NAME (element));
548 if (GST_IS_BIN (element)) {
549 gst_bin_create_plan (GST_BIN (element));
551 if (GST_IS_SRC (element)) {
552 GST_DEBUG (0,"adding '%s' as entry point, because it's a source\n",GST_ELEMENT_NAME (element));
553 bin->entries = g_list_prepend (bin->entries, element);
557 // go through all the pads, set pointers, and check for connections
558 pads = gst_element_get_pad_list (element);
560 pad = GST_PAD (pads->data);
562 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
563 GST_DEBUG (0,"found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
565 // copy the peer's chain function, easy enough
566 GST_DEBUG (0,"copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
567 GST_RPAD_PUSHFUNC(pad) = GST_DEBUG_FUNCPTR(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)));
569 // need to walk through and check for outside connections
570 //FIXME need to do this for all pads
571 // get the pad's peer
572 peer = GST_RPAD_PEER(pad);
574 GST_DEBUG (0,"found SINK pad %s has no peer\n", GST_ELEMENT_NAME (pad));
577 // get the parent of the peer of the pad
578 outside = GST_ELEMENT (GST_RPAD_PARENT(peer));
580 // if it's a connection and it's not ours...
581 if (GST_IS_CONNECTION (outside) &&
582 (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
583 gst_info("gstbin: element \"%s\" is the external source Connection "
584 "for internal element \"%s\"\n",
585 GST_ELEMENT_NAME (GST_ELEMENT (outside)),
586 GST_ELEMENT_NAME (GST_ELEMENT (element)));
587 bin->entries = g_list_prepend (bin->entries, outside);
592 GST_DEBUG (0,"found pad %s\n", GST_ELEMENT_NAME (pad));
594 pads = g_list_next (pads);
597 elements = g_list_next (elements);
605 // If cothreads are needed, we need to not only find elements but
606 // set up cothread states and various proxy functions.
607 if (bin->need_cothreads) {
608 GST_DEBUG (0,"bin is using cothreads\n");
610 // first create thread context
611 if (bin->threadcontext == NULL) {
612 GST_DEBUG (0,"initializing cothread context\n");
613 bin->threadcontext = cothread_init ();
616 // walk through all the children
617 elements = bin->managed_elements;
619 element = GST_ELEMENT (elements->data);
620 elements = g_list_next (elements);
622 // start out with a NULL warpper function, we'll set it if we want a cothread
623 wrapper_function = NULL;
625 // have to decide if we need to or can use a cothreads, and if so which wrapper
626 // first of all, if there's a loopfunc, the decision's already made
627 if (element->loopfunc != NULL) {
628 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
629 GST_DEBUG (0,"element %s is a loopfunc, must use a cothread\n",GST_ELEMENT_NAME (element));
631 // otherwise we need to decide if it needs a cothread
632 // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
633 if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
634 (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
635 !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
636 // base it on whether we're going to loop through source or sink pads
637 if (element->numsinkpads == 0)
638 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
640 wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
644 // walk through the all the pads for this element, setting proxy functions
645 // the selection of proxy functions depends on whether we're in a cothread or not
646 pads = gst_element_get_pad_list (element);
648 pad = GST_PAD (pads->data);
649 pads = g_list_next (pads);
651 // check to see if someone else gets to set up the element
652 peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
653 if (peer_manager != GST_ELEMENT(bin)) {
654 GST_DEBUG (0,"WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
657 // if the wrapper_function is set, we need to use the proxy functions
658 if (wrapper_function != NULL) {
659 // set up proxy functions
660 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
661 GST_DEBUG (0,"setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
662 pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
663 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
664 GST_DEBUG (0,"setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
665 pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
668 // otherwise we need to set up for 'traditional' chaining
669 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
670 // we can just copy the chain function, since it shares the prototype
671 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",
672 GST_DEBUG_PAD_NAME(pad));
673 pad->pushfunc = pad->chainfunc;
674 } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
675 // we can just copy the get function, since it shares the prototype
676 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",
677 GST_DEBUG_PAD_NAME(pad));
678 pad->pullfunc = pad->getfunc;
683 // if a loopfunc has been specified, create and set up a cothread
684 if (wrapper_function != NULL) {
685 if (element->threadstate == NULL) {
686 element->threadstate = cothread_create (bin->threadcontext);
687 GST_DEBUG (0,"created cothread %p (@%p) for \"%s\"\n",element->threadstate,
688 &element->threadstate,GST_ELEMENT_NAME (element));
690 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
691 GST_DEBUG (0,"set wrapper function for \"%s\" to &%s\n",GST_ELEMENT_NAME (element),
692 GST_DEBUG_FUNCPTR_NAME(wrapper_function));
695 // // HACK: if the element isn't decoupled, it's an entry
696 // if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
697 // bin->entries = g_list_append(bin->entries, element);
700 // otherwise, cothreads are not needed
702 GST_DEBUG (0,"bin is chained, no cothreads needed\n");
704 elements = bin->managed_elements;
706 element = GST_ELEMENT (elements->data);
707 elements = g_list_next (elements);
709 pads = gst_element_get_pad_list (element);
711 pad = GST_PAD (pads->data);
712 pads = g_list_next (pads);
714 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
715 GST_DEBUG (0,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
716 pad->pushfunc = pad->chainfunc;
718 GST_DEBUG (0,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
719 pad->pullfunc = pad->getfunc;