2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
5 * gstscheduler.c: Default scheduling code for most cases
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
23 /*#define GST_DEBUG_ENABLED */
26 typedef struct _GstSchedulerChain GstSchedulerChain;
28 struct _GstSchedulerChain {
38 gint cothreaded_elements;
42 static GType _gst_basic_scheduler_type = 0;
44 static void gst_basic_scheduler_class_init (GstSchedulerClass * klass);
45 static void gst_basic_scheduler_init (GstScheduler * scheduler);
47 static void gst_basic_scheduler_dispose (GObject *object);
49 static void gst_basic_scheduler_setup (GstScheduler *sched);
50 static void gst_basic_scheduler_reset (GstScheduler *sched);
51 static void gst_basic_scheduler_add_element (GstScheduler *sched, GstElement *element);
52 static void gst_basic_scheduler_remove_element (GstScheduler *sched, GstElement *element);
53 static void gst_basic_scheduler_enable_element (GstScheduler *sched, GstElement *element);
54 static void gst_basic_scheduler_disable_element (GstScheduler *sched, GstElement *element);
55 static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element);
56 static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
57 static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
58 static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
59 static GstPad* gst_basic_scheduler_pad_select (GstScheduler *sched, GList *padlist);
60 static gboolean gst_basic_scheduler_iterate (GstScheduler *sched);
62 static void gst_basic_scheduler_show (GstScheduler *sched);
64 static GstSchedulerClass *parent_class = NULL;
67 gst_basic_scheduler_get_type (void)
69 if (!_gst_basic_scheduler_type) {
70 static const GTypeInfo scheduler_info = {
71 sizeof (GstSchedulerClass),
74 (GClassInitFunc) gst_basic_scheduler_class_init,
77 sizeof (GstScheduler),
79 (GInstanceInitFunc) gst_basic_scheduler_init,
83 _gst_basic_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER, "GstBasicScheduler", &scheduler_info, 0);
85 return _gst_basic_scheduler_type;
89 gst_basic_scheduler_class_init (GstSchedulerClass * klass)
91 GObjectClass *gobject_class;
92 GstObjectClass *gstobject_class;
94 gobject_class = (GObjectClass*)klass;
95 gstobject_class = (GstObjectClass*)klass;
97 parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
99 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose);
101 klass->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
102 klass->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
103 klass->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
104 klass->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
105 klass->enable_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_enable_element);
106 klass->disable_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_disable_element);
107 klass->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
108 klass->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
109 klass->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
110 klass->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
111 klass->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
112 klass->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
116 gst_basic_scheduler_init (GstScheduler *scheduler)
121 gst_basic_scheduler_dispose (GObject *object)
124 G_OBJECT_CLASS (parent_class)->dispose (object);
128 plugin_init (GModule *module, GstPlugin *plugin)
130 GstSchedulerFactory *factory;
132 gst_plugin_set_longname (plugin, "A basic scheduler");
134 factory = gst_schedulerfactory_new ("basic",
135 "A basic scheduler, it uses cothreads",
136 gst_basic_scheduler_get_type());
138 if (factory != NULL) {
139 gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
142 g_warning ("could not register scheduler: basic");
147 GstPluginDesc plugin_desc = {
155 gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
157 GstElement *element = GST_ELEMENT (argv);
158 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
160 GST_DEBUG_ENTER ("(%d,'%s')", argc, name);
163 GST_DEBUG (GST_CAT_DATAFLOW, "calling loopfunc %s for element %s\n",
164 GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name);
165 (element->loopfunc) (element);
166 GST_DEBUG (GST_CAT_DATAFLOW, "element %s ended loop function\n", name);
167 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
168 GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
170 GST_DEBUG_LEAVE ("(%d,'%s')", argc, name);
175 gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
177 GstElement *element = GST_ELEMENT (argv);
178 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
180 GST_DEBUG_ENTER ("(\"%s\")", name);
182 GST_DEBUG (GST_CAT_DATAFLOW, "stepping through pads\n");
185 GList *pads = element->pads;
188 GstPad *pad = GST_PAD (pads->data);
191 pads = g_list_next (pads);
192 if (!GST_IS_REAL_PAD (pad))
194 realpad = GST_REAL_PAD (pad);
195 if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
198 GST_DEBUG (GST_CAT_DATAFLOW, "pulling data from %s:%s\n", name, GST_PAD_NAME (pad));
199 buf = gst_pad_pull (pad);
201 if (GST_IS_EVENT (buf) && !GST_ELEMENT_IS_EVENT_AWARE (element)) {
202 /*gst_pad_event_default (pad, GST_EVENT (buf)); */
203 gst_pad_send_event (pad, GST_EVENT (buf));
206 GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s\n", name,
208 GST_RPAD_CHAINFUNC (realpad) (pad, buf);
211 GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
215 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
216 GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
218 GST_DEBUG_LEAVE ("(%d,'%s')", argc, name);
223 gst_basic_scheduler_src_wrapper (int argc, char *argv[])
225 GstElement *element = GST_ELEMENT (argv);
228 GstBuffer *buf = NULL;
229 G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
231 GST_DEBUG_ENTER ("(%d,\"%s\")", argc, name);
234 pads = element->pads;
236 if (!GST_IS_REAL_PAD (pads->data))
238 realpad = (GstRealPad *) (pads->data);
239 pads = g_list_next (pads);
240 if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) {
241 GST_DEBUG (GST_CAT_DATAFLOW, "calling _getfunc for %s:%s\n", GST_DEBUG_PAD_NAME (realpad));
242 if (realpad->regiontype != GST_REGION_VOID) {
243 g_return_val_if_fail (GST_RPAD_GETREGIONFUNC (realpad) != NULL, 0);
244 /* if (GST_RPAD_GETREGIONFUNC(realpad) == NULL) */
245 /* fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); */
248 (GST_RPAD_GETREGIONFUNC (realpad)) ((GstPad *) realpad, realpad->regiontype,
249 realpad->offset, realpad->len);
250 realpad->regiontype = GST_REGION_VOID;
253 g_return_val_if_fail (GST_RPAD_GETFUNC (realpad) != NULL, 0);
254 /* if (GST_RPAD_GETFUNC(realpad) == NULL) */
255 /* fprintf(stderr,"error, no getfunc in \"%s\"\n", name); */
257 buf = GST_RPAD_GETFUNC (realpad) ((GstPad *) realpad);
260 GST_DEBUG (GST_CAT_DATAFLOW, "calling gst_pad_push on pad %s:%s\n",
261 GST_DEBUG_PAD_NAME (realpad));
262 gst_pad_push ((GstPad *) realpad, buf);
265 } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
266 GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
268 GST_DEBUG_LEAVE ("");
273 gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
275 GstRealPad *peer = GST_RPAD_PEER (pad);
277 GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
278 GST_DEBUG (GST_CAT_DATAFLOW, "putting buffer %p in peer \"%s:%s\"'s pen\n", buf,
279 GST_DEBUG_PAD_NAME (peer));
281 /* FIXME this should be bounded
282 * loop until the bufferpen is empty so we can fill it up again
284 while (GST_RPAD_BUFPEN (pad) != NULL) {
285 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
286 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
287 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
289 /* we may no longer be the same pad, check. */
290 if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
291 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
292 pad = (GstPad *) GST_RPAD_PEER (peer);
296 g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
297 /* now fill the bufferpen and switch so it can be consumed */
298 GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
299 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
300 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
301 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
303 GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
307 gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf)
309 g_print ("select proxy (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
311 GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
313 GST_DEBUG (GST_CAT_DATAFLOW, "putting buffer %p in peer's pen\n", buf);
315 g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
316 /* now fill the bufferpen and switch so it can be consumed */
317 GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
318 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
319 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
320 g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
321 gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
322 GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
323 GST_FLAG_UNSET (GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
324 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
326 g_print ("done switching\n");
327 GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
332 gst_basic_scheduler_gethandler_proxy (GstPad * pad)
335 GstRealPad *peer = GST_RPAD_PEER (pad);
337 GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
339 /* FIXME this should be bounded */
340 /* we will loop switching to the peer until it's filled up the bufferpen */
341 while (GST_RPAD_BUFPEN (pad) == NULL) {
342 GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
343 GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))),
344 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
345 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
347 /* we may no longer be the same pad, check. */
348 if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
349 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
350 pad = (GstPad *) GST_RPAD_PEER (peer);
353 GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
355 /* now grab the buffer from the pen, clear the pen, and return the buffer */
356 buf = GST_RPAD_BUFPEN (pad);
357 GST_RPAD_BUFPEN (pad) = NULL;
363 gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guint64 offset, guint64 len)
366 GstRealPad *peer = GST_RPAD_PEER (pad);
368 GST_DEBUG_ENTER ("%s:%s,%d,%lld,%lld", GST_DEBUG_PAD_NAME (pad), type, offset, len);
370 /* put the region info into the pad */
371 GST_RPAD_REGIONTYPE (pad) = type;
372 GST_RPAD_OFFSET (pad) = offset;
373 GST_RPAD_LEN (pad) = len;
375 /* FIXME this should be bounded */
376 /* we will loop switching to the peer until it's filled up the bufferpen */
377 while (GST_RPAD_BUFPEN (pad) == NULL) {
378 GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
379 GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
380 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
382 /* we may no longer be the same pad, check. */
383 if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
384 GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!\n");
385 pad = (GstPad *) GST_RPAD_PEER (peer);
388 GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
390 /* now grab the buffer from the pen, clear the pen, and return the buffer */
391 buf = GST_RPAD_BUFPEN (pad);
392 GST_RPAD_BUFPEN (pad) = NULL;
398 gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
402 cothread_func wrapper_function;
406 GST_DEBUG (GST_CAT_SCHEDULING, "chain is using COTHREADS\n");
408 g_assert (bin->threadcontext != NULL);
411 /* walk through all the chain's elements */
412 elements = chain->elements;
414 element = GST_ELEMENT (elements->data);
415 elements = g_list_next (elements);
417 /* start out without a wrapper function, we select it later */
418 wrapper_function = NULL;
420 /* if the element has a loopfunc... */
421 if (element->loopfunc != NULL) {
422 wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_loopfunc_wrapper);
423 GST_DEBUG (GST_CAT_SCHEDULING, "element '%s' is a loop-based\n", GST_ELEMENT_NAME (element));
426 /* otherwise we need to decide what kind of cothread */
427 /* if it's not DECOUPLED, we decide based on whether it's a source or not */
428 if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
429 /* if it doesn't have any sinks, it must be a source (duh) */
430 if (element->numsinkpads == 0) {
431 wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper);
432 GST_DEBUG (GST_CAT_SCHEDULING, "element '%s' is a source, using _src_wrapper\n",
433 GST_ELEMENT_NAME (element));
436 wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_chain_wrapper);
437 GST_DEBUG (GST_CAT_SCHEDULING, "element '%s' is a filter, using _chain_wrapper\n",
438 GST_ELEMENT_NAME (element));
443 /* now we have to walk through the pads to set up their state */
444 pads = gst_element_get_pad_list (element);
446 pad = GST_PAD (pads->data);
447 pads = g_list_next (pads);
448 if (!GST_IS_REAL_PAD (pad))
451 /* if the element is DECOUPLED or outside the manager, we have to chain */
452 if ((wrapper_function == NULL) ||
453 (GST_RPAD_PEER (pad) &&
454 (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
456 /* set the chain proxies */
457 if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
458 GST_DEBUG (GST_CAT_SCHEDULING, "copying chain function into push proxy for %s:%s\n",
459 GST_DEBUG_PAD_NAME (pad));
460 GST_RPAD_CHAINHANDLER (pad) = GST_RPAD_CHAINFUNC (pad);
463 GST_DEBUG (GST_CAT_SCHEDULING, "copying get function into pull proxy for %s:%s\n",
464 GST_DEBUG_PAD_NAME (pad));
465 GST_RPAD_GETHANDLER (pad) = GST_RPAD_GETFUNC (pad);
466 GST_RPAD_PULLREGIONFUNC (pad) = GST_RPAD_GETREGIONFUNC (pad);
469 /* otherwise we really are a cothread */
472 if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
473 GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded push proxy for sinkpad %s:%s\n",
474 GST_DEBUG_PAD_NAME (pad));
475 GST_RPAD_CHAINHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
478 GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded pull proxy for srcpad %s:%s\n",
479 GST_DEBUG_PAD_NAME (pad));
480 GST_RPAD_GETHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy);
481 GST_RPAD_PULLREGIONFUNC (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pullregionfunc_proxy);
486 /* need to set up the cothread now */
487 if (wrapper_function != NULL) {
488 if (element->threadstate == NULL) {
489 /* FIXME handle cothread_create returning NULL */
490 element->threadstate = cothread_create (bin->threadcontext);
491 GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", element->threadstate,
492 GST_ELEMENT_NAME (element));
494 cothread_setfunc (element->threadstate, wrapper_function, 0, (char **) element);
495 GST_DEBUG (GST_CAT_SCHEDULING, "set wrapper function for '%s' to &%s\n",
496 GST_ELEMENT_NAME (element), GST_DEBUG_FUNCPTR_NAME (wrapper_function));
502 G_GNUC_UNUSED static void
503 gst_basic_scheduler_chained_chain (GstBin *bin, _GstBinChain *chain) {
509 GST_DEBUG (GST_CAT_SCHEDULING,"chain entered\n");
510 // walk through all the elements
511 elements = chain->elements;
513 element = GST_ELEMENT (elements->data);
514 elements = g_list_next (elements);
516 // walk through all the pads
517 pads = gst_element_get_pad_list (element);
519 pad = GST_PAD (pads->data);
520 pads = g_list_next (pads);
521 if (!GST_IS_REAL_PAD(pad)) continue;
523 if (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) {
524 GST_DEBUG (GST_CAT_SCHEDULING,"copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
525 GST_RPAD_CHAINHANDLER(pad) = GST_RPAD_CHAINFUNC(pad);
527 GST_DEBUG (GST_CAT_SCHEDULING,"copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
528 GST_RPAD_GETHANDLER(pad) = GST_RPAD_GETFUNC(pad);
529 GST_RPAD_PULLREGIONFUNC(pad) = GST_RPAD_GETREGIONFUNC(pad);
537 static GstSchedulerChain *
538 gst_basic_scheduler_chain_new (GstScheduler * sched)
540 GstSchedulerChain *chain = g_new (GstSchedulerChain, 1);
542 /* initialize the chain with sane values */
543 chain->sched = sched;
544 chain->disabled = NULL;
545 chain->elements = NULL;
546 chain->num_elements = 0;
548 chain->cothreaded_elements = 0;
549 chain->schedule = FALSE;
551 /* add the chain to the schedulers' list of chains */
552 sched->chains = g_list_prepend (sched->chains, chain);
555 GST_INFO (GST_CAT_SCHEDULING, "created new chain %p, now are %d chains in sched %p",
556 chain, sched->num_chains, sched);
562 gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain)
564 GstScheduler *sched = chain->sched;
566 /* remove the chain from the schedulers' list of chains */
567 sched->chains = g_list_remove (sched->chains, chain);
570 /* destroy the chain */
571 g_list_free (chain->disabled); /* should be empty... */
572 g_list_free (chain->elements); /* ditto */
574 GST_INFO (GST_CAT_SCHEDULING, "destroyed chain %p, now are %d chains in sched %p", chain,
575 sched->num_chains, sched);
581 gst_basic_scheduler_chain_add_element (GstSchedulerChain * chain, GstElement * element)
583 GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to chain %p", GST_ELEMENT_NAME (element),
586 /* set the sched pointer for the element */
587 element->sched = chain->sched;
589 /* add the element to the list of 'disabled' elements */
590 chain->disabled = g_list_prepend (chain->disabled, element);
591 chain->num_elements++;
595 gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement * element)
597 GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),
600 /* remove from disabled list */
601 chain->disabled = g_list_remove (chain->disabled, element);
603 /* add to elements list */
604 chain->elements = g_list_prepend (chain->elements, element);
606 /* reschedule the chain */
607 gst_basic_scheduler_cothreaded_chain (GST_BIN (chain->sched->parent), chain);
611 gst_basic_scheduler_chain_disable_element (GstSchedulerChain * chain, GstElement * element)
613 GST_INFO (GST_CAT_SCHEDULING, "disabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),
616 /* remove from elements list */
617 chain->elements = g_list_remove (chain->elements, element);
619 /* add to disabled list */
620 chain->disabled = g_list_prepend (chain->disabled, element);
622 /* reschedule the chain */
623 /* FIXME this should be done only if manager state != NULL */
624 /* gst_basic_scheduler_cothreaded_chain(GST_BIN(chain->sched->parent),chain); */
628 gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement * element)
630 GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from chain %p", GST_ELEMENT_NAME (element),
633 /* if it's active, deactivate it */
634 if (g_list_find (chain->elements, element)) {
635 gst_basic_scheduler_chain_disable_element (chain, element);
638 /* remove the element from the list of elements */
639 chain->disabled = g_list_remove (chain->disabled, element);
640 chain->num_elements--;
642 /* if there are no more elements in the chain, destroy the chain */
643 if (chain->num_elements == 0)
644 gst_basic_scheduler_chain_destroy (chain);
646 /* unset the sched pointer for the element */
647 element->sched = NULL;
651 gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1, GstElement * element2)
654 GstSchedulerChain *chain;
655 GstSchedulerChain *chain1 = NULL, *chain2 = NULL;
658 /* first find the chains that hold the two */
659 chains = sched->chains;
661 chain = (GstSchedulerChain *) (chains->data);
662 chains = g_list_next (chains);
664 if (g_list_find (chain->disabled, element1))
666 else if (g_list_find (chain->elements, element1))
669 if (g_list_find (chain->disabled, element2))
671 else if (g_list_find (chain->elements, element2))
675 /* first check to see if they're in the same chain, we're done if that's the case */
676 if ((chain1 != NULL) && (chain1 == chain2)) {
677 GST_INFO (GST_CAT_SCHEDULING, "elements are already in the same chain");
681 /* now, if neither element has a chain, create one */
682 if ((chain1 == NULL) && (chain2 == NULL)) {
683 GST_INFO (GST_CAT_SCHEDULING, "creating new chain to hold two new elements");
684 chain = gst_basic_scheduler_chain_new (sched);
685 gst_basic_scheduler_chain_add_element (chain, element1);
686 gst_basic_scheduler_chain_add_element (chain, element2);
687 /* FIXME chain changed here */
688 /* gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
690 /* otherwise if both have chains already, join them */
692 else if ((chain1 != NULL) && (chain2 != NULL)) {
693 GST_INFO (GST_CAT_SCHEDULING, "merging chain %p into chain %p", chain2, chain1);
694 /* take the contents of chain2 and merge them into chain1 */
695 chain1->disabled = g_list_concat (chain1->disabled, g_list_copy (chain2->disabled));
696 chain1->elements = g_list_concat (chain1->elements, g_list_copy (chain2->elements));
697 chain1->num_elements += chain2->num_elements;
698 /* FIXME chain changed here */
699 /* gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
701 gst_basic_scheduler_chain_destroy (chain2);
703 /* otherwise one has a chain already, the other doesn't */
706 /* pick out which one has the chain, and which doesn't */
708 chain = chain1, element = element2;
710 chain = chain2, element = element1;
712 GST_INFO (GST_CAT_SCHEDULING, "adding element to existing chain");
713 gst_basic_scheduler_chain_add_element (chain, element);
714 /* FIXME chain changed here */
715 /* gst_basic_scheduler_cothreaded_chain(chain->sched->parent,chain); */
720 /* find the chain within the scheduler that holds the element, if any */
721 static GstSchedulerChain *
722 gst_basic_scheduler_find_chain (GstScheduler * sched, GstElement * element)
725 GstSchedulerChain *chain;
727 GST_INFO (GST_CAT_SCHEDULING, "searching for element \"%s\" in chains",
728 GST_ELEMENT_NAME (element));
730 chains = sched->chains;
732 chain = (GstSchedulerChain *) (chains->data);
733 chains = g_list_next (chains);
735 if (g_list_find (chain->elements, element))
737 if (g_list_find (chain->disabled, element))
745 gst_basic_scheduler_chain_recursive_add (GstSchedulerChain * chain, GstElement * element)
749 GstElement *peerelement;
751 /* add the element to the chain */
752 gst_basic_scheduler_chain_add_element (chain, element);
754 GST_DEBUG (GST_CAT_SCHEDULING, "recursing on element \"%s\"\n", GST_ELEMENT_NAME (element));
755 /* now go through all the pads and see which peers can be added */
756 pads = element->pads;
758 pad = GST_PAD (pads->data);
759 pads = g_list_next (pads);
761 GST_DEBUG (GST_CAT_SCHEDULING, "have pad %s:%s, checking for valid peer\n",
762 GST_DEBUG_PAD_NAME (pad));
763 /* if the peer exists and could be in the same chain */
764 if (GST_PAD_PEER (pad)) {
765 GST_DEBUG (GST_CAT_SCHEDULING, "has peer %s:%s\n", GST_DEBUG_PAD_NAME (GST_PAD_PEER (pad)));
766 peerelement = GST_PAD_PARENT (GST_PAD_PEER (pad));
767 if (GST_ELEMENT_SCHED (GST_PAD_PARENT (pad)) == GST_ELEMENT_SCHED (peerelement)) {
768 GST_DEBUG (GST_CAT_SCHEDULING, "peer \"%s\" is valid for same chain\n",
769 GST_ELEMENT_NAME (peerelement));
770 /* if it's not already in a chain, add it to this one */
771 if (gst_basic_scheduler_find_chain (chain->sched, peerelement) == NULL) {
772 gst_basic_scheduler_chain_recursive_add (chain, peerelement);
780 * Entry points for this scheduler.
783 gst_basic_scheduler_setup (GstScheduler *sched)
785 GstBin *bin = GST_BIN (sched->parent);
787 /* first create thread context */
788 if (bin->threadcontext == NULL) {
789 GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context\n");
790 bin->threadcontext = cothread_init ();
795 gst_basic_scheduler_reset (GstScheduler *sched)
797 cothread_context *ctx;
798 GstBin *bin = GST_BIN (GST_SCHED_PARENT (sched));
799 GList *elements = sched->elements;
802 GST_ELEMENT (elements->data)->threadstate = NULL;
803 elements = g_list_next (elements);
806 ctx = GST_BIN (GST_SCHED_PARENT (sched))->threadcontext;
810 GST_BIN (GST_SCHED_PARENT (sched))->threadcontext = NULL;
814 gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
818 GstElement *peerelement;
819 GstSchedulerChain *chain;
821 g_return_if_fail (element != NULL);
822 g_return_if_fail (GST_IS_ELEMENT (element));
824 /* if it's already in this scheduler, don't bother doing anything */
825 if (GST_ELEMENT_SCHED (element) == sched)
828 GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to scheduler", GST_ELEMENT_NAME (element));
830 /* if the element already has a different scheduler, remove the element from it */
831 if (GST_ELEMENT_SCHED (element)) {
832 gst_basic_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
835 /* set the sched pointer in the element itself */
836 GST_ELEMENT_SCHED (element) = sched;
838 /* only deal with elements after this point, not bins */
839 /* exception is made for Bin's that are schedulable, like the autoplugger */
840 if (GST_IS_BIN (element) && !GST_FLAG_IS_SET (element, GST_BIN_SELF_SCHEDULABLE))
843 /* first add it to the list of elements that are to be scheduled */
844 sched->elements = g_list_prepend (sched->elements, element);
845 sched->num_elements++;
847 /* create a chain to hold it, and add */
848 chain = gst_basic_scheduler_chain_new (sched);
849 gst_basic_scheduler_chain_add_element (chain, element);
851 /* set the sched pointer in all the pads */
852 pads = element->pads;
854 pad = GST_PAD (pads->data);
855 pads = g_list_next (pads);
857 /* we only operate on real pads */
858 if (!GST_IS_REAL_PAD (pad))
861 /* set the pad's sched pointer */
862 gst_pad_set_sched (pad, sched);
864 /* if the peer element exists and is a candidate */
865 if (GST_PAD_PEER (pad)) {
866 peerelement = GST_PAD_PARENT (GST_PAD_PEER (pad));
867 if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) {
868 GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together");
869 /* make sure that the two elements are in the same chain */
870 gst_basic_scheduler_chain_elements (sched, element, peerelement);
877 gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
879 GstSchedulerChain *chain;
881 g_return_if_fail (element != NULL);
882 g_return_if_fail (GST_IS_ELEMENT (element));
884 if (g_list_find (sched->elements, element)) {
885 GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler",
886 GST_ELEMENT_NAME (element));
888 /* find what chain the element is in */
889 chain = gst_basic_scheduler_find_chain (sched, element);
891 /* remove it from its chain */
892 gst_basic_scheduler_chain_remove_element (chain, element);
894 /* remove it from the list of elements */
895 sched->elements = g_list_remove (sched->elements, element);
896 sched->num_elements--;
898 /* unset the scheduler pointer in the element */
899 GST_ELEMENT_SCHED (element) = NULL;
904 gst_basic_scheduler_enable_element (GstScheduler *sched, GstElement *element)
906 GstSchedulerChain *chain;
908 /* find the chain the element's in */
909 chain = gst_basic_scheduler_find_chain (sched, element);
912 gst_basic_scheduler_chain_enable_element (chain, element);
915 GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, not enabling", GST_ELEMENT_NAME (element));
920 gst_basic_scheduler_disable_element (GstScheduler *sched, GstElement *element)
922 GstSchedulerChain *chain;
924 /* find the chain the element is in */
925 chain = gst_basic_scheduler_find_chain (sched, element);
927 /* remove it from the chain */
929 gst_basic_scheduler_chain_disable_element (chain, element);
932 GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, not disabling", GST_ELEMENT_NAME (element));
937 gst_basic_scheduler_lock_element (GstScheduler * sched, GstElement * element)
939 if (element->threadstate)
940 cothread_lock (element->threadstate);
944 gst_basic_scheduler_unlock_element (GstScheduler * sched, GstElement * element)
946 if (element->threadstate)
947 cothread_unlock (element->threadstate);
951 gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad * sinkpad)
953 GstElement *srcelement, *sinkelement;
955 srcelement = GST_PAD_PARENT (srcpad);
956 g_return_if_fail (srcelement != NULL);
957 sinkelement = GST_PAD_PARENT (sinkpad);
958 g_return_if_fail (sinkelement != NULL);
960 GST_INFO (GST_CAT_SCHEDULING, "have pad connected callback on %s:%s to %s:%s",
961 GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
962 GST_DEBUG (GST_CAT_SCHEDULING, "srcpad sched is %p, sinkpad sched is %p\n",
963 GST_ELEMENT_SCHED (srcelement), GST_ELEMENT_SCHED (sinkelement));
965 if (GST_ELEMENT_SCHED (srcelement) == GST_ELEMENT_SCHED (sinkelement)) {
966 GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same scheduler, chaining together",
967 GST_DEBUG_PAD_NAME (sinkpad));
968 gst_basic_scheduler_chain_elements (sched, srcelement, sinkelement);
973 gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPad * sinkpad)
975 GstSchedulerChain *chain;
976 GstElement *element1, *element2;
977 GstSchedulerChain *chain1, *chain2;
979 GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
980 GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
982 /* we need to have the parent elements of each pad */
983 element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad));
984 element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad));
986 /* first task is to remove the old chain they belonged to.
987 * this can be accomplished by taking either of the elements,
988 * since they are guaranteed to be in the same chain
989 * FIXME is it potentially better to make an attempt at splitting cleaner??
991 chain1 = gst_basic_scheduler_find_chain (sched, element1);
992 chain2 = gst_basic_scheduler_find_chain (sched, element2);
994 if (chain1 != chain2) {
995 GST_INFO (GST_CAT_SCHEDULING, "elements not in the same chain");
1000 GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1001 gst_basic_scheduler_chain_destroy (chain1);
1003 /* now create a new chain to hold element1 and build it from scratch */
1004 chain1 = gst_basic_scheduler_chain_new (sched);
1005 gst_basic_scheduler_chain_recursive_add (chain1, element1);
1008 /* check the other element to see if it landed in the newly created chain */
1010 GST_INFO (GST_CAT_SCHEDULING, "destroying chain");
1011 gst_basic_scheduler_chain_destroy (chain2);
1013 /* if not in chain, create chain and build from scratch */
1014 chain2 = gst_basic_scheduler_chain_new (sched);
1015 gst_basic_scheduler_chain_recursive_add (chain2, element2);
1020 gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist)
1023 GList *padlist2 = padlist;
1025 GST_INFO (GST_CAT_SCHEDULING, "performing select");
1028 pad = GST_PAD (padlist2->data);
1030 if (gst_pad_peek (pad)) {
1031 g_print ("found something in pad %s:%s\n", GST_DEBUG_PAD_NAME (pad));
1035 padlist2 = g_list_next (padlist2);
1038 /* else there is nothing ready to consume, set up the select functions */
1040 pad = GST_PAD (padlist->data);
1042 GST_RPAD_CHAINHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_select_proxy);
1044 padlist = g_list_next (padlist);
1047 GstRealPad *peer = GST_RPAD_PEER (pad);
1049 cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
1051 g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
1052 gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
1053 pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
1055 g_assert (pad != NULL);
1056 g_print ("back from select (%s:%s)\n", GST_DEBUG_PAD_NAME (pad));
1062 gst_basic_scheduler_iterate (GstScheduler * sched)
1064 GstBin *bin = GST_BIN (sched->parent);
1066 GstSchedulerChain *chain;
1068 gboolean eos = FALSE;
1071 GST_DEBUG_ENTER ("(\"%s\")", GST_ELEMENT_NAME (bin));
1073 g_return_val_if_fail (bin != NULL, TRUE);
1074 g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
1076 /* step through all the chains */
1077 chains = sched->chains;
1079 g_return_val_if_fail (chains != NULL, FALSE);
1082 chain = (GstSchedulerChain *) (chains->data);
1083 chains = g_list_next (chains);
1085 /* all we really have to do is switch to the first child */
1086 /* FIXME this should be lots more intelligent about where to start */
1087 GST_DEBUG (GST_CAT_DATAFLOW, "starting iteration via cothreads\n");
1089 if (chain->elements) {
1090 entry = NULL; /*MattH ADDED?*/
1091 GST_DEBUG (GST_CAT_SCHEDULING, "there are %d elements in this chain\n", chain->num_elements);
1092 elements = chain->elements;
1094 entry = GST_ELEMENT (elements->data);
1095 elements = g_list_next (elements);
1096 if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) {
1097 GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is DECOUPLED, skipping\n",
1098 GST_ELEMENT_NAME (entry));
1101 else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_NO_ENTRY)) {
1102 GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is not valid, skipping\n",
1103 GST_ELEMENT_NAME (entry));
1110 GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
1111 GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
1112 GST_ELEMENT_NAME (entry), entry);
1113 cothread_switch (entry->threadstate);
1115 /* following is a check to see if the chain was interrupted due to a
1116 * top-half state_change(). (i.e., if there's a pending state.)
1118 * if it was, return to gstthread.c::gst_thread_main_loop() to
1119 * execute the state change.
1121 GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch ended or interrupted\n");
1122 if (GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) != GST_STATE_VOID_PENDING) {
1123 GST_DEBUG (GST_CAT_DATAFLOW, "handle pending state %d\n",
1124 GST_STATE_PENDING (GST_SCHEDULER (sched)->parent));
1130 GST_INFO (GST_CAT_DATAFLOW, "NO ENTRY INTO CHAIN!");
1135 GST_INFO (GST_CAT_DATAFLOW, "NO ENABLED ELEMENTS IN CHAIN!!");
1140 GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
1146 gst_basic_scheduler_show (GstScheduler * sched)
1148 GList *chains, *elements;
1149 GstElement *element;
1150 GstSchedulerChain *chain;
1152 if (sched == NULL) {
1153 g_print ("scheduler doesn't exist for this element\n");
1157 g_return_if_fail (GST_IS_SCHEDULER (sched));
1159 g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n", GST_ELEMENT_NAME (sched->parent));
1161 g_print ("scheduler has %d elements in it: ", sched->num_elements);
1162 elements = sched->elements;
1164 element = GST_ELEMENT (elements->data);
1165 elements = g_list_next (elements);
1167 g_print ("%s, ", GST_ELEMENT_NAME (element));
1171 g_print ("scheduler has %d chains in it\n", sched->num_chains);
1172 chains = sched->chains;
1174 chain = (GstSchedulerChain *) (chains->data);
1175 chains = g_list_next (chains);
1177 g_print ("%p: ", chain);
1179 elements = chain->disabled;
1181 element = GST_ELEMENT (elements->data);
1182 elements = g_list_next (elements);
1184 g_print ("!%s, ", GST_ELEMENT_NAME (element));
1187 elements = chain->elements;
1189 element = GST_ELEMENT (elements->data);
1190 elements = g_list_next (elements);
1192 g_print ("%s, ", GST_ELEMENT_NAME (element));