gstmeta.c \
gsttee.c \
gstxml.c \
- cothreads.c
+ cothreads.c \
+ gstscheduler.c
libgstincludedir = $(includedir)/gst
libgstinclude_HEADERS = \
gsttee.h \
gstxml.h \
gstdebug.h \
- cothreads.h
+ cothreads.h \
+ gstscheduler.h
noinst_HEADERS = \
gstarch.h \
do {
buf = gst_pad_pull (identity->sinkpad);
+ g_print("(%s:%s)i ",GST_DEBUG_PAD_NAME(identity->sinkpad));
gst_pad_push (identity->srcpad, buf);
ARG_0,
ARG_LEVEL,
ARG_MAX_LEVEL,
+ ARG_BLOCK,
};
GTK_ARG_READABLE, ARG_LEVEL);
gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_MAX_LEVEL);
+ gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL,
+ GTK_ARG_READWRITE, ARG_BLOCK);
gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
static void
gst_queue_init (GstQueue *queue)
{
- GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY);
+ // scheduling on this kind of element is, well, interesting
+ GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
queue->queue = NULL;
queue->level_buffers = 0;
queue->max_buffers = 20;
+ queue->block = TRUE;
queue->level_bytes = 0;
queue->size_buffers = 0;
queue->size_bytes = 0;
DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
DEBUG("queue: %s have queue lock\n", name);
+ // we bail if there's nothing there
+ if (!queue->level_buffers && !queue->block) {
+ GST_UNLOCK(queue);
+ return NULL;
+ }
+
while (!queue->level_buffers) {
STATUS("queue: %s U released lock\n");
GST_UNLOCK (queue);
case ARG_MAX_LEVEL:
queue->max_buffers = GTK_VALUE_INT (*arg);
break;
+ case ARG_BLOCK:
+ queue->block = GTK_VALUE_BOOL (*arg);
+ break;
default:
break;
}
case ARG_MAX_LEVEL:
GTK_VALUE_INT (*arg) = queue->max_buffers;
break;
+ case ARG_BLOCK:
+ GTK_VALUE_BOOL (*arg) = queue->block;
+ break;
default:
arg->type = GTK_TYPE_INVALID;
break;
gint level_buffers; /* number of buffers queued here */
gint max_buffers; /* maximum number of buffers queued here */
+ gboolean block; /* if set to FALSE, _get returns NULL if queue empty */
gint level_bytes; /* number of bytes queued here */
gint size_buffers; /* size of queue in buffers */
gint size_bytes; /* size of queue in bytes */
#include "gstsrc.h"
#include "gstconnection.h"
+#include "gstscheduler.h"
+
GstElementDetails gst_bin_details = {
"Generic bin",
"Bin",
GtkType type);
static void gst_bin_create_plan_func (GstBin *bin);
+//static void gst_bin_schedule_func (GstBin *bin);
static void gst_bin_iterate_func (GstBin *bin);
static xmlNodePtr gst_bin_save_thyself (GstElement *element, xmlNodePtr parent);
klass->change_state_type = gst_bin_change_state_type;
klass->create_plan = gst_bin_create_plan_func;
+ klass->schedule = gst_bin_schedule_func;
klass->iterate = gst_bin_iterate_func;
gstelement_class->change_state = gst_bin_change_state;
(oclass->create_plan) (bin);
}
-typedef struct {
- gulong offset;
- gulong size;
-} region_struct;
-
-static int
-gst_bin_loopfunc_wrapper (int argc,char *argv[])
-{
- GstElement *element = GST_ELEMENT (argv);
- G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
-
- DEBUG_ENTER("(%d,'%s')",argc,name);
-
- do {
- DEBUG("calling loopfunc %s for element %s\n",
- GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
- (element->loopfunc) (element);
- DEBUG("element %s ended loop function\n", name);
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
- GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
-
- DEBUG_LEAVE("(%d,'%s')",argc,name);
- return 0;
-}
-
-static int
-gst_bin_chain_wrapper (int argc,char *argv[])
-{
- GstElement *element = GST_ELEMENT (argv);
- G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
- GList *pads;
- GstPad *pad;
- GstBuffer *buf;
-
- DEBUG_ENTER("(\"%s\")",name);
- DEBUG("stepping through pads\n");
- do {
- pads = element->pads;
- while (pads) {
- pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
- if (pad->direction == GST_PAD_SINK) {
- DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
- buf = gst_pad_pull (pad);
- DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
- (pad->chainfunc) (pad,buf);
- DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
- }
- }
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
- GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
-
- DEBUG_LEAVE("(%d,'%s')",argc,name);
- return 0;
-}
-
-static int
-gst_bin_src_wrapper (int argc,char *argv[])
-{
- GstElement *element = GST_ELEMENT (argv);
- GList *pads;
- GstPad *pad;
- GstBuffer *buf;
- G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
-
- DEBUG_ENTER("(%d,\"%s\")",argc,name);
-
- do {
- pads = element->pads;
- while (pads) {
- pad = GST_PAD (pads->data);
- if (pad->direction == GST_PAD_SRC) {
- region_struct *region = cothread_get_data (element->threadstate, "region");
- DEBUG("calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- if (region) {
- //gst_src_push_region (GST_SRC (element), region->offset, region->size);
- if (pad->getregionfunc == NULL)
- fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
- buf = (pad->getregionfunc)(pad, region->offset, region->size);
- } else {
- if (pad->getfunc == NULL)
- fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
- buf = (pad->getfunc)(pad);
- }
-
- DEBUG("calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- gst_pad_push (pad, buf);
- }
- pads = g_list_next(pads);
- }
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
- GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
-
- DEBUG_LEAVE("");
- return 0;
-}
-
-static void
-gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf)
-{
- cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate;
- DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
- DEBUG("putting buffer %p in peer's pen\n",buf);
- pad->peer->bufpen = buf;
- DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate));
- cothread_switch (threadstate);
- DEBUG("done switching\n");
-}
-
-static GstBuffer*
-gst_bin_pullfunc_proxy (GstPad *pad)
+/**
+ * gst_bin_schedule:
+ * @bin: #GstBin to schedule
+ *
+ * let the bin figure out how to handle the plugins in it.
+ */
+void
+gst_bin_schedule (GstBin *bin)
{
- GstBuffer *buf;
+ GstBinClass *oclass;
- cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate;
- DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
- if (pad->bufpen == NULL) {
- DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate));
- cothread_switch (threadstate);
- }
- DEBUG("done switching\n");
- buf = pad->bufpen;
- pad->bufpen = NULL;
- return buf;
-}
+ oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass);
-static GstBuffer *
-gst_bin_chainfunc_proxy (GstPad *pad)
-{
- GstBuffer *buf;
+ if (oclass->schedule)
+ (oclass->schedule) (bin);
}
-// FIXME!!!
-static void
-gst_bin_pullregionfunc_proxy (GstPad *pad,
- gulong offset,
- gulong size)
-{
- region_struct region;
- cothread_state *threadstate;
-
- DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size);
-
- region.offset = offset;
- region.size = size;
-
- threadstate = GST_ELEMENT(pad->parent)->threadstate;
- cothread_set_data (threadstate, "region", ®ion);
- cothread_switch (threadstate);
- cothread_set_data (threadstate, "region", NULL);
-}
+typedef struct {
+ gulong offset;
+ gulong size;
+} region_struct;
static void
GList *elements;
GstElement *element;
const gchar *elementname;
- GSList *pending_bins = NULL;
+ GSList *pending = NULL;
GstBin *pending_bin;
- GList *pads;
- GstPad *pad;
- GstElement *peer_manager;
- cothread_func wrapper_function;
+// GList *pads;
+// GstPad *pad;
+// GstElement *peer_manager;
+// cothread_func wrapper_function;
+// _GstBinChain *chain;
+// GList *chains;
DEBUG_SET_STRING("(\"%s\")",gst_element_get_name (GST_ELEMENT (bin)));
DEBUG_ENTER_STRING;
// if it's a loop-based element, use cothreads
if (element->loopfunc != NULL) {
DEBUG("requiring cothreads because \"%s\" is a loop-based element\n",elementname);
- bin->need_cothreads = TRUE;
+ GST_FLAG_SET (element, GST_ELEMENT_USE_COTHREAD);
// if it's a 'complex' element, use cothreads
} else if (GST_FLAG_IS_SET (element, GST_ELEMENT_COMPLEX)) {
DEBUG("requiring cothreads because \"%s\" is complex\n",elementname);
- bin->need_cothreads = TRUE;
+ GST_FLAG_SET (element, GST_ELEMENT_USE_COTHREAD);
// if the element has more than one sink pad, use cothreads
} else if (element->numsinkpads > 1) {
DEBUG("requiring cothreads because \"%s\" has more than one sink pad\n",elementname);
- bin->need_cothreads = TRUE;
+ GST_FLAG_SET (element, GST_ELEMENT_USE_COTHREAD);
}
+ if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
+ bin->need_cothreads = TRUE;
}
}
// find all the managed children
// here we pull off the trick of walking an entire arbitrary tree without recursion
DEBUG("attempting to find all the elements to manage\n");
- pending_bins = g_slist_prepend (pending_bins, bin);
+ pending = g_slist_prepend (pending, bin);
do {
// retrieve the top of the stack and pop it
- pending_bin = GST_BIN (pending_bins->data);
- pending_bins = g_slist_remove (pending_bins, pending_bin);
+ pending_bin = GST_BIN (pending->data);
+ pending = g_slist_remove (pending, pending_bin);
// walk the list of elements, find bins, and do stuff
DEBUG("checking Bin \"%s\" for managed elements\n",
// if it's a Bin, add it to the list of Bins to check
if (GST_IS_BIN (element)) {
DEBUG("flattened recurse into \"%s\"\n",elementname);
- pending_bins = g_slist_prepend (pending_bins, element);
+ pending = g_slist_prepend (pending, element);
// otherwise add it to the list of elements
} else {
DEBUG("found element \"%s\" that I manage\n",elementname);
}
}
}
- } while (pending_bins);
+ } while (pending);
DEBUG("have %d elements to manage, implementing plan\n",bin->num_managed_elements);
- // If cothreads are needed, we need to not only find elements but
- // set up cothread states and various proxy functions.
- if (bin->need_cothreads) {
- DEBUG("bin is using cothreads\n");
-
- // first create thread context
- if (bin->threadcontext == NULL) {
- DEBUG("initializing cothread context\n");
- bin->threadcontext = cothread_init ();
- }
-
- // walk through all the children
- elements = bin->managed_elements;
- while (elements) {
- element = GST_ELEMENT (elements->data);
- elements = g_list_next (elements);
-
- // start out with a NULL warpper function, we'll set it if we want a cothread
- wrapper_function = NULL;
-
- // have to decide if we need to or can use a cothreads, and if so which wrapper
- // first of all, if there's a loopfunc, the decision's already made
- if (element->loopfunc != NULL) {
- wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
- DEBUG("element %s is a loopfunc, must use a cothread\n",gst_element_get_name(element));
- } else {
- // otherwise we need to decide if it needs a cothread
- // if it's complex, or cothreads are preferred and it's *not* passive, cothread it
- if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
- (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
- !GST_FLAG_IS_SET (element,GST_ELEMENT_SCHEDULE_PASSIVELY))) {
- // base it on whether we're going to loop through source or sink pads
- if (element->numsinkpads == 0)
- wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
- else
- wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
- }
- }
-
- // walk through the all the pads for this element, setting proxy functions
- // the selection of proxy functions depends on whether we're in a cothread or not
- pads = gst_element_get_pad_list (element);
- while (pads) {
- pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
-
- // check to see if someone else gets to set up the element
- peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
- if (peer_manager != GST_ELEMENT(bin)) {
- DEBUG("WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
- }
-
- // if the wrapper_function is set, we need to use the proxy functions
- if (wrapper_function != NULL) {
- // set up proxy functions
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
- } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
- DEBUG("setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
- }
- } else {
- // otherwise we need to set up for 'traditional' chaining
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- // we can just copy the chain function, since it shares the prototype
- DEBUG("copying chain function into push proxy for %s:%s\n",
- GST_DEBUG_PAD_NAME(pad));
- pad->pushfunc = pad->chainfunc;
- } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
- // we can just copy the get function, since it shares the prototype
- DEBUG("copying get function into pull proxy for %s:%s\n",
- GST_DEBUG_PAD_NAME(pad));
- pad->pullfunc = pad->getfunc;
- }
- }
- }
-
- // if a loopfunc has been specified, create and set up a cothread
- if (wrapper_function != NULL) {
- if (element->threadstate == NULL) {
- element->threadstate = cothread_create (bin->threadcontext);
- DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate,
- &element->threadstate,gst_element_get_name(element));
- }
- cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
- DEBUG("set wrapper function for \"%s\" to &%s\n",gst_element_get_name(element),
- GST_DEBUG_FUNCPTR_NAME(wrapper_function));
- }
-
-// // HACK: if the element isn't passive, it's an entry
-// if (!GST_FLAG_IS_SET(element,GST_ELEMENT_SCHEDULE_PASSIVELY))
-// bin->entries = g_list_append(bin->entries, element);
- }
-
- // otherwise, cothreads are not needed
- } else {
- DEBUG("bin is chained, no cothreads needed\n");
-
- elements = bin->managed_elements;
- while (elements) {
- element = GST_ELEMENT (elements->data);
- elements = g_list_next (elements);
-
- pads = gst_element_get_pad_list (element);
- while (pads) {
- pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
-
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- pad->pushfunc = pad->chainfunc;
- } else {
- DEBUG("copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- pad->pullfunc = pad->getfunc;
- }
- }
- }
- }
+ gst_bin_schedule(bin);
DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
}
void
gst_bin_iterate_func (GstBin *bin)
{
+ GList *chains;
+ _GstBinChain *chain;
GList *entries;
GstElement *entry;
GList *pads;
g_return_if_fail (GST_IS_BIN (bin));
g_return_if_fail (GST_STATE (bin) == GST_STATE_PLAYING);
- if (bin->need_cothreads) {
- // all we really have to do is switch to the first child
- // FIXME this should be lots more intelligent about where to start
- DEBUG("starting iteration via cothreads\n");
+ // step through all the chains
+ chains = bin->chains;
+ while (chains) {
+ chain = (_GstBinChain *)(chains->data);
+ chains = g_list_next (chains);
- entry = GST_ELEMENT (bin->managed_elements->data);
- GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
- DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
- gst_element_get_name(entry),entry);
- cothread_switch (entry->threadstate);
+ if (chain->use_cothreads) {
+ // all we really have to do is switch to the first child
+ // FIXME this should be lots more intelligent about where to start
+ DEBUG("starting iteration via cothreads\n");
- } else {
- DEBUG("starting iteration via chain-functions\n");
+ entry = GST_ELEMENT (chain->elements->data);
+ GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
+ DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
+ gst_element_get_name(entry),entry);
+ cothread_switch (entry->threadstate);
- if (bin->num_entries <= 0) {
- DEBUG("no entries in bin \"%s\", trying managed elements...\n",
- gst_element_get_name(GST_ELEMENT(bin)));
- // we will try loop over the elements then...
- entries = bin->managed_elements;
- }
- else {
- entries = bin->entries;
- }
-
- g_assert (entries != NULL);
-
- while (entries) {
- entry = GST_ELEMENT (entries->data);
- entries = g_list_next (entries);
-
- DEBUG("have entry \"%s\"\n",gst_element_get_name(entry));
-
- if (GST_IS_SRC (entry) || GST_IS_CONNECTION (entry)) {
- pads = entry->pads;
- while (pads) {
- pad = GST_PAD (pads->data);
- if (pad->direction == GST_PAD_SRC) {
- DEBUG("calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- if (pad->getfunc == NULL)
- fprintf(stderr, "error, no getfunc in \"%s\"\n", gst_element_get_name (entry));
- else
- buf = (pad->getfunc)(pad);
- gst_pad_push(pad,buf);
+ } else {
+ DEBUG("starting iteration via chain-functions\n");
+
+ entries = chain->entries;
+
+ g_assert (entries != NULL);
+
+ while (entries) {
+ entry = GST_ELEMENT (entries->data);
+ entries = g_list_next (entries);
+
+ DEBUG("have entry \"%s\"\n",gst_element_get_name(entry));
+
+ if (GST_IS_SRC (entry) || GST_IS_CONNECTION (entry)) {
+ pads = entry->pads;
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ if (pad->direction == GST_PAD_SRC) {
+ DEBUG("calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ if (pad->getfunc == NULL)
+ fprintf(stderr, "error, no getfunc in \"%s\"\n", gst_element_get_name (entry));
+ else
+ buf = (pad->getfunc)(pad);
+ gst_pad_push(pad,buf);
+ }
+ pads = g_list_next (pads);
}
- pads = g_list_next (pads);
+// } else if (GST_IS_CONNECTION (entry)) {
+// gst_connection_push (GST_CONNECTION (entry));
+ } else if (GST_IS_BIN (entry))
+ gst_bin_iterate (GST_BIN (entry));
+ else {
+ fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry));
+// g_assert_not_reached ();
}
-// } else if (GST_IS_CONNECTION (entry)) {
-// gst_connection_push (GST_CONNECTION (entry));
- } else if (GST_IS_BIN (entry))
- gst_bin_iterate (GST_BIN (entry));
- else {
- fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry));
-// g_assert_not_reached ();
}
}
}
DEBUG_LEAVE("(%s)", gst_element_get_name (GST_ELEMENT (bin)));
}
-
-
-
-/*
- // ***** check for possible connections outside
- // get the pad's peer
- peer = gst_pad_get_peer (pad);
- // FIXME this should be an error condition, if not disabled
- if (!peer) break;
- // get the parent of the peer of the pad
- outside = GST_ELEMENT (gst_pad_get_parent (peer));
- // FIXME this should *really* be an error condition
- if (!outside) break;
- // if it's a source or connection and it's not ours...
- if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
- (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("dealing with outside source element %s\n",gst_element_get_name(outside));
-// DEBUG("PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n",
-//GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc);
-// pad->pullfunc = pad->peer->pullfunc;
-// DEBUG("PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
-// pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy);
- pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
- }
- } else {
-*/
-
-
-
-
-
-/*
- } else if (GST_IS_SRC (element)) {
- DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
- bin->entries = g_list_prepend (bin->entries,element);
- bin->num_entries++;
- cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
- }
-
- pads = gst_element_get_pad_list (element);
- while (pads) {
- pad = GST_PAD(pads->data);
-
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- // set the proxy functions
- pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
- DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
- } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
- DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- // set the proxy functions
- pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
- DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
- &pad->pullfunc,gst_bin_pullfunc_proxy);
- pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
- }
- pads = g_list_next (pads);
- }
- elements = g_list_next (elements);
-
- // if there are no entries, we have to pick one at random
- if (bin->num_entries == 0)
- bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
- }
- } else {
- DEBUG("don't need cothreads, looking for entry points\n");
- // we have to find which elements will drive an iteration
- elements = bin->children;
- while (elements) {
- element = GST_ELEMENT (elements->data);
- DEBUG("found element \"%s\"\n", gst_element_get_name (element));
- if (GST_IS_BIN (element)) {
- gst_bin_create_plan (GST_BIN (element));
- }
- if (GST_IS_SRC (element)) {
- DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
- bin->entries = g_list_prepend (bin->entries, element);
- bin->num_entries++;
- }
-
- // go through all the pads, set pointers, and check for connections
- pads = gst_element_get_pad_list (element);
- while (pads) {
- pad = GST_PAD (pads->data);
-
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
-
- // copy the peer's chain function, easy enough
- DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
- pad->pushfunc = GST_DEBUG_FUNCPTR(pad->peer->chainfunc);
-
- // need to walk through and check for outside connections
-//FIXME need to do this for all pads
- // get the pad's peer
- peer = gst_pad_get_peer (pad);
- if (!peer) {
- DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad));
- break;
- }
- // get the parent of the peer of the pad
- outside = GST_ELEMENT (gst_pad_get_parent (peer));
- if (!outside) break;
- // if it's a connection and it's not ours...
- if (GST_IS_CONNECTION (outside) &&
- (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
- gst_info("gstbin: element \"%s\" is the external source Connection "
- "for internal element \"%s\"\n",
- gst_element_get_name (GST_ELEMENT (outside)),
- gst_element_get_name (GST_ELEMENT (element)));
- bin->entries = g_list_prepend (bin->entries, outside);
- bin->num_entries++;
- }
- }
- else {
- DEBUG("found pad %s\n", gst_pad_get_name (pad));
- }
- pads = g_list_next (pads);
-
- }
- elements = g_list_next (elements);
- }
-*/
-
typedef struct _GstBin GstBin;
typedef struct _GstBinClass GstBinClass;
+typedef struct __GstBinChain _GstBinChain;
struct _GstBin {
GstElement element;
gboolean need_cothreads;
GList *managed_elements;
gint num_managed_elements;
+
+ GList *chains;
+ gint num_chains;
GList *entries;
gint num_entries;
cothread_context *threadcontext;
gboolean use_cothreads;
- GList *outside_schedules;
};
struct _GstBinClass {
GtkType type);
/* create a plan for the execution of the bin */
void (*create_plan) (GstBin *bin);
+ void (*schedule) (GstBin *bin);
/* run a full iteration of operation */
void (*iterate) (GstBin *bin);
};
+struct __GstBinChain {
+ GList *elements;
+ gint num_elements;
+
+ GList *entries;
+
+ gboolean use_cothreads;
+
+ GstElement *entry;
+};
GtkType gst_bin_get_type (void);
GList* gst_bin_get_list (GstBin *bin);
void gst_bin_create_plan (GstBin *bin);
+void gst_bin_schedule (GstBin *bin);
gboolean gst_bin_set_state_type (GstBin *bin,
GstElementState state,
GtkType type);
(_debug_string != NULL) ? \
fprintf(stderr,GST_DEBUG_PREFIX("%s: "format , _debug_string , ## args )) : \
fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args ))
+#define DEBUG_NOPREFIX(format,args...) fprintf(stderr,format , ## args )
#define DEBUG_ENTER(format, args...) \
fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args ))
#define DEBUG_SET_STRING(format, args...) \
#define DEBUG_LEAVE_STRING DEBUG_LEAVE("%s",_debug_string)
#else
#define DEBUG(format, args...)
+#define DEBUG_NOPREFIX(format, args...)
#define DEBUG_ENTER(format, args...)
#define DEBUG_LEAVE(format, args...)
#define DEBUG_SET_STRING(format, args...)
typedef enum {
// element is complex (for some def.) and generally require a cothread
GST_ELEMENT_COMPLEX = GST_OBJECT_FLAG_LAST,
- // not to be scheduled directly, let others trigger all events
- GST_ELEMENT_SCHEDULE_PASSIVELY,
+ // input and output pads aren't directly coupled to each other
+ // examples: queues, multi-output async readers, etc.
+ GST_ELEMENT_DECOUPLED,
// this element should be placed in a thread if at all possible
GST_ELEMENT_THREAD_SUGGESTED,
// this element is incable of seeking (FIXME: does this apply to filters?)
GST_ELEMENT_NEW_LOOPFUNC,
// the cothread holding this element needs to be stopped
GST_ELEMENT_COTHREAD_STOPPING,
+ // the element has to be scheduled as a cothread for any sanity
+ GST_ELEMENT_USE_COTHREAD,
/* use some padding for future expansion */
GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 8,
--- /dev/null
+/* Gnome-Streamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#define GST_DEBUG_ENABLED
+
+#include "gstscheduler.h"
+#include "gstdebug.h"
+
+
+static int
+gst_bin_loopfunc_wrapper (int argc,char *argv[])
+{
+ GstElement *element = GST_ELEMENT (argv);
+ G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
+
+ DEBUG_ENTER("(%d,'%s')",argc,name);
+
+ do {
+ DEBUG("calling loopfunc %s for element %s\n",
+ GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
+ (element->loopfunc) (element);
+ DEBUG("element %s ended loop function\n", name);
+ } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+ GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
+
+ DEBUG_LEAVE("(%d,'%s')",argc,name);
+ return 0;
+}
+
+static int
+gst_bin_chain_wrapper (int argc,char *argv[])
+{
+ GstElement *element = GST_ELEMENT (argv);
+ G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
+ GList *pads;
+ GstPad *pad;
+ GstBuffer *buf;
+
+ DEBUG_ENTER("(\"%s\")",name);
+ DEBUG("stepping through pads\n");
+ do {
+ pads = element->pads;
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+ if (pad->direction == GST_PAD_SINK) {
+ DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
+ buf = gst_pad_pull (pad);
+ DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
+ (pad->chainfunc) (pad,buf);
+ DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
+ }
+ }
+ } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+ GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
+
+ DEBUG_LEAVE("(%d,'%s')",argc,name);
+ return 0;
+}
+
+static int
+gst_bin_src_wrapper (int argc,char *argv[])
+{
+ GstElement *element = GST_ELEMENT (argv);
+ GList *pads;
+ GstPad *pad;
+ GstBuffer *buf;
+ G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
+
+ DEBUG_ENTER("(%d,\"%s\")",argc,name);
+
+ do {
+ pads = element->pads;
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ if (pad->direction == GST_PAD_SRC) {
+// region_struct *region = cothread_get_data (element->threadstate, "region");
+ DEBUG("calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+// if (region) {
+ //gst_src_push_region (GST_SRC (element), region->offset, region->size);
+// if (pad->getregionfunc == NULL)
+// fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
+// buf = (pad->getregionfunc)(pad, region->offset, region->size);
+// } else {
+ if (pad->getfunc == NULL)
+ fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
+ buf = (pad->getfunc)(pad);
+// }
+
+ DEBUG("calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ gst_pad_push (pad, buf);
+ }
+ pads = g_list_next(pads);
+ }
+ } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+ GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
+
+ DEBUG_LEAVE("");
+ return 0;
+}
+
+static void
+gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf)
+{
+ cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate;
+ DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
+ DEBUG("putting buffer %p in peer's pen\n",buf);
+ pad->peer->bufpen = buf;
+ DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate));
+ cothread_switch (threadstate);
+ DEBUG("done switching\n");
+}
+
+static GstBuffer*
+gst_bin_pullfunc_proxy (GstPad *pad)
+{
+ GstBuffer *buf;
+
+ cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate;
+ DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
+ if (pad->bufpen == NULL) {
+ DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate));
+ cothread_switch (threadstate);
+ }
+ DEBUG("done switching\n");
+ buf = pad->bufpen;
+ pad->bufpen = NULL;
+ return buf;
+}
+
+static GstBuffer *
+gst_bin_chainfunc_proxy (GstPad *pad)
+{
+// FIXME!!
+// GstBuffer *buf;
+ return NULL;
+}
+
+// FIXME!!!
+static void
+gst_bin_pullregionfunc_proxy (GstPad *pad,
+ gulong offset,
+ gulong size)
+{
+// region_struct region;
+ cothread_state *threadstate;
+
+ DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size);
+
+// region.offset = offset;
+// region.size = size;
+
+// threadstate = GST_ELEMENT(pad->parent)->threadstate;
+// cothread_set_data (threadstate, "region", ®ion);
+ cothread_switch (threadstate);
+// cothread_set_data (threadstate, "region", NULL);
+}
+
+
+static void
+gst_schedule_cothreaded_chain (GstBin *bin, _GstBinChain *chain) {
+ GList *elements;
+ GstElement *element;
+ cothread_func wrapper_function;
+ GList *pads;
+ GstPad *pad;
+
+ DEBUG("chain is using cothreads\n");
+
+ // first create thread context
+ if (bin->threadcontext == NULL) {
+ DEBUG("initializing cothread context\n");
+ bin->threadcontext = cothread_init ();
+ }
+
+ // walk through all the chain's elements
+ elements = chain->elements;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ elements = g_list_next (elements);
+
+ // start out without a wrapper function, we select it later
+ wrapper_function = NULL;
+
+ // if the element has a loopfunc...
+ if (element->loopfunc != NULL) {
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
+ DEBUG("\nelement '%s' is a loop-based\n",gst_element_get_name(element));
+ } else {
+ // otherwise we need to decide what kind of cothread
+ // if it's not DECOUPLED, we decide based on whether it's a source or not
+ if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ // if it doesn't have any sinks, it must be a source (duh)
+ if (element->numsinkpads == 0) {
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
+ DEBUG("\nelement '%s' is a source, using _src_wrapper\n",gst_element_get_name(element));
+ } else {
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
+ DEBUG("\nelement '%s' is a filter, using _chain_wrapper\n",gst_element_get_name(element));
+ }
+ }
+ }
+
+ // now we have to walk through the pads to set up their state
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+
+ // if the element is DECOUPLED or outside the manager, we have to chain
+ if ((wrapper_function == NULL) ||
+ (GST_ELEMENT(pad->peer->parent)->manager != GST_ELEMENT(bin))) {
+ // set the chain proxies
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = pad->chainfunc;
+ } else {
+ DEBUG("copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = pad->getfunc;
+ pad->pullregionfunc = pad->getregionfunc;
+ }
+
+ // otherwise we really are a cothread
+ } else {
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
+ } else {
+ DEBUG("setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
+ }
+ }
+ }
+
+ // need to set up the cothread now
+ if (wrapper_function != NULL) {
+ if (element->threadstate == NULL) {
+ element->threadstate = cothread_create (bin->threadcontext);
+ DEBUG("created cothread %p for '%s'\n",element->threadstate,gst_element_get_name(element));
+ }
+ cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
+ DEBUG("set wrapper function for '%s' to &%s\n",gst_element_get_name(element),
+ GST_DEBUG_FUNCPTR_NAME(wrapper_function));
+ }
+ }
+}
+
+void gst_bin_schedule_func(GstBin *bin) {
+// GstElement *manager;
+ GList *elements;
+ GstElement *element;
+// const gchar *elementname;
+ GSList *pending = NULL;
+// GstBin *pending_bin;
+ GList *pads;
+ GstPad *pad;
+// GstElement *peer_manager;
+ GList *chains;
+ _GstBinChain *chain;
+
+ DEBUG_SET_STRING("(\"%s\")",gst_element_get_name (GST_ELEMENT (bin)));
+ DEBUG_ENTER_STRING;
+
+ // next we have to find all the separate scheduling chains
+ DEBUG("\nattempting to find scheduling chains...\n");
+ // first make a copy of the managed_elements we can mess with
+ elements = g_list_copy (bin->managed_elements);
+ // we have to repeat until the list is empty to get all chains
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+
+ // if this is a DECOUPLED element
+ if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ // skip this element entirely
+ DEBUG("skipping '%s' because it's decoupled\n",gst_element_get_name(element));
+ elements = g_list_next (elements);
+ continue;
+ }
+
+ DEBUG("starting with element '%s'\n",gst_element_get_name(element));
+
+ // prime the pending list with the first element off the top
+ pending = g_slist_prepend (NULL, element);
+ // and remove that one from the main list
+ elements = g_list_remove (elements, element);
+
+ // create a chain structure
+ chain = g_new0 (_GstBinChain, 1);
+
+ // for each pending element, walk the pipeline
+ do {
+ // retrieve the top of the stack and pop it
+ element = GST_ELEMENT (pending->data);
+ pending = g_slist_remove (pending, element);
+
+ // add ourselves to the chain's list of elements
+ DEBUG("adding '%s' to chain\n",gst_element_get_name(element));
+ chain->elements = g_list_prepend (chain->elements, element);
+ chain->num_elements++;
+ // set the cothreads flag as appropriate
+ if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD))
+ chain->use_cothreads = TRUE;
+
+ // if we're managed by the current bin, and we're not decoupled,
+ // go find all the peers and add them to the list of elements to check
+ if ((element->manager == GST_ELEMENT(bin)) &&
+ !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ // remove ourselves from the outer list of all managed elements
+ DEBUG("removing '%s' from list of possible elements\n",gst_element_get_name(element));
+ elements = g_list_remove (elements, element);
+
+ // now we have to walk the pads to find peers
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+ DEBUG("have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+
+ // only bother with if the pad's peer's parent is this bin or it's DECOUPLED
+ // only add it if it's in the list of un-visited elements still
+ if ((g_list_find (elements, pad->peer->parent) != NULL) ||
+ GST_FLAG_IS_SET (pad->peer->parent, GST_ELEMENT_DECOUPLED)) {
+ // add the peer element to the pending list
+ DEBUG("adding '%s' to list of pending elements\n",gst_element_get_name(GST_ELEMENT(pad->peer->parent)));
+ pending = g_slist_prepend (pending, GST_ELEMENT(pad->peer->parent));
+ } else
+ DEBUG("element '%s' has already been dealt with\n",gst_element_get_name(GST_ELEMENT(pad->peer->parent)));
+ }
+ }
+ } while (pending);
+
+ // add the chain to the bin
+ DEBUG("have chain with %d elements: ",chain->num_elements);
+ { GList *elements = chain->elements;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ elements = g_list_next(elements);
+ DEBUG_NOPREFIX("%s, ",gst_element_get_name(element));
+ }
+ }
+ DEBUG_NOPREFIX("\n");
+ bin->chains = g_list_prepend (bin->chains, chain);
+ bin->num_chains++;
+ }
+ // free up the list in case it's full of DECOUPLED elements
+ g_list_free (elements);
+
+ DEBUG("\nwe have %d chains to schedule\n",bin->num_chains);
+
+ // now we have to go through all the chains and schedule them
+ chains = bin->chains;
+ while (chains) {
+ chain = (_GstBinChain *)(chains->data);
+ chains = g_list_next (chains);
+
+ // schedule as appropriate
+ if (chain->use_cothreads) {
+ gst_schedule_cothreaded_chain (bin,chain);
+ } else {
+ DEBUG("non-cothreaded case not coded yet\n");
+ }
+ }
+
+ DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
+}
+
+
+/*
+ // ***** check for possible connections outside
+ // get the pad's peer
+ peer = gst_pad_get_peer (pad);
+ // FIXME this should be an error condition, if not disabled
+ if (!peer) break;
+ // get the parent of the peer of the pad
+ outside = GST_ELEMENT (gst_pad_get_parent (peer));
+ // FIXME this should *really* be an error condition
+ if (!outside) break;
+ // if it's a source or connection and it's not ours...
+ if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
+ (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("dealing with outside source element %s\n",gst_element_get_name(outside));
+// DEBUG("PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n",
+//GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc);
+// pad->pullfunc = pad->peer->pullfunc;
+// DEBUG("PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
+// pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy);
+ pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
+ }
+ } else {
+*/
+
+
+
+
+
+/*
+ } else if (GST_IS_SRC (element)) {
+ DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
+ bin->entries = g_list_prepend (bin->entries,element);
+ bin->num_entries++;
+ cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
+ }
+
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD(pads->data);
+
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ // set the proxy functions
+ pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
+ DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
+ } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+ DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ // set the proxy functions
+ pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
+ DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
+ &pad->pullfunc,gst_bin_pullfunc_proxy);
+ pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
+ }
+ pads = g_list_next (pads);
+ }
+ elements = g_list_next (elements);
+
+ // if there are no entries, we have to pick one at random
+ if (bin->num_entries == 0)
+ bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
+ }
+ } else {
+ DEBUG("don't need cothreads, looking for entry points\n");
+ // we have to find which elements will drive an iteration
+ elements = bin->children;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ DEBUG("found element \"%s\"\n", gst_element_get_name (element));
+ if (GST_IS_BIN (element)) {
+ gst_bin_create_plan (GST_BIN (element));
+ }
+ if (GST_IS_SRC (element)) {
+ DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
+ bin->entries = g_list_prepend (bin->entries, element);
+ bin->num_entries++;
+ }
+
+ // go through all the pads, set pointers, and check for connections
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD (pads->data);
+
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
+
+ // copy the peer's chain function, easy enough
+ DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = GST_DEBUG_FUNCPTR(pad->peer->chainfunc);
+
+ // need to walk through and check for outside connections
+//FIXME need to do this for all pads
+ // get the pad's peer
+ peer = gst_pad_get_peer (pad);
+ if (!peer) {
+ DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad));
+ break;
+ }
+ // get the parent of the peer of the pad
+ outside = GST_ELEMENT (gst_pad_get_parent (peer));
+ if (!outside) break;
+ // if it's a connection and it's not ours...
+ if (GST_IS_CONNECTION (outside) &&
+ (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
+ gst_info("gstbin: element \"%s\" is the external source Connection "
+ "for internal element \"%s\"\n",
+ gst_element_get_name (GST_ELEMENT (outside)),
+ gst_element_get_name (GST_ELEMENT (element)));
+ bin->entries = g_list_prepend (bin->entries, outside);
+ bin->num_entries++;
+ }
+ }
+ else {
+ DEBUG("found pad %s\n", gst_pad_get_name (pad));
+ }
+ pads = g_list_next (pads);
+
+ }
+ elements = g_list_next (elements);
+ }
+*/
+
+
+
+
+/*
+ // If cothreads are needed, we need to not only find elements but
+ // set up cothread states and various proxy functions.
+ if (bin->need_cothreads) {
+ DEBUG("bin is using cothreads\n");
+
+ // first create thread context
+ if (bin->threadcontext == NULL) {
+ DEBUG("initializing cothread context\n");
+ bin->threadcontext = cothread_init ();
+ }
+
+ // walk through all the children
+ elements = bin->managed_elements;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ elements = g_list_next (elements);
+
+ // start out with a NULL warpper function, we'll set it if we want a cothread
+ wrapper_function = NULL;
+
+ // have to decide if we need to or can use a cothreads, and if so which wrapper
+ // first of all, if there's a loopfunc, the decision's already made
+ if (element->loopfunc != NULL) {
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
+ DEBUG("element %s is a loopfunc, must use a cothread\n",gst_element_get_name(element));
+ } else {
+ // otherwise we need to decide if it needs a cothread
+ // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it
+ if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
+ (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
+ !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) {
+ // base it on whether we're going to loop through source or sink pads
+ if (element->numsinkpads == 0)
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
+ else
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
+ }
+ }
+
+ // walk through the all the pads for this element, setting proxy functions
+ // the selection of proxy functions depends on whether we're in a cothread or not
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+
+ // check to see if someone else gets to set up the element
+ peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
+ if (peer_manager != GST_ELEMENT(bin)) {
+ DEBUG("WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
+ }
+
+ // if the wrapper_function is set, we need to use the proxy functions
+ if (wrapper_function != NULL) {
+ // set up proxy functions
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
+ } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+ DEBUG("setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
+ }
+ } else {
+ // otherwise we need to set up for 'traditional' chaining
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ // we can just copy the chain function, since it shares the prototype
+ DEBUG("copying chain function into push proxy for %s:%s\n",
+ GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = pad->chainfunc;
+ } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+ // we can just copy the get function, since it shares the prototype
+ DEBUG("copying get function into pull proxy for %s:%s\n",
+ GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = pad->getfunc;
+ }
+ }
+ }
+
+ // if a loopfunc has been specified, create and set up a cothread
+ if (wrapper_function != NULL) {
+ if (element->threadstate == NULL) {
+ element->threadstate = cothread_create (bin->threadcontext);
+ DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate,
+ &element->threadstate,gst_element_get_name(element));
+ }
+ cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
+ DEBUG("set wrapper function for \"%s\" to &%s\n",gst_element_get_name(element),
+ GST_DEBUG_FUNCPTR_NAME(wrapper_function));
+ }
+
+// // HACK: if the element isn't decoupled, it's an entry
+// if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED))
+// bin->entries = g_list_append(bin->entries, element);
+ }
+
+ // otherwise, cothreads are not needed
+ } else {
+ DEBUG("bin is chained, no cothreads needed\n");
+
+ elements = bin->managed_elements;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ elements = g_list_next (elements);
+
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = pad->chainfunc;
+ } else {
+ DEBUG("copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = pad->getfunc;
+ }
+ }
+ }
+ }
+*/
+
+
--- /dev/null
+/* Gnome-Streamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifndef __GST_SCHEDULER_H__
+#define __GST_SCHEDULER_H__
+
+#include <gst/gstbin.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+
+void gst_bin_schedule_func(GstBin *bin);
+
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif /* __GST_SCHEDULER_H__ */
static void
gst_thread_init (GstThread *thread)
{
+ DEBUG("initializing thread '%s'\n",gst_element_get_name(GST_ELEMENT(thread)));
+
// we're a manager by default
GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
do {
buf = gst_pad_pull (identity->sinkpad);
+ g_print("(%s:%s)i ",GST_DEBUG_PAD_NAME(identity->sinkpad));
gst_pad_push (identity->srcpad, buf);
ARG_0,
ARG_LEVEL,
ARG_MAX_LEVEL,
+ ARG_BLOCK,
};
GTK_ARG_READABLE, ARG_LEVEL);
gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_MAX_LEVEL);
+ gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL,
+ GTK_ARG_READWRITE, ARG_BLOCK);
gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
static void
gst_queue_init (GstQueue *queue)
{
- GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY);
+ // scheduling on this kind of element is, well, interesting
+ GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
queue->queue = NULL;
queue->level_buffers = 0;
queue->max_buffers = 20;
+ queue->block = TRUE;
queue->level_bytes = 0;
queue->size_buffers = 0;
queue->size_bytes = 0;
DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
DEBUG("queue: %s have queue lock\n", name);
+ // we bail if there's nothing there
+ if (!queue->level_buffers && !queue->block) {
+ GST_UNLOCK(queue);
+ return NULL;
+ }
+
while (!queue->level_buffers) {
STATUS("queue: %s U released lock\n");
GST_UNLOCK (queue);
case ARG_MAX_LEVEL:
queue->max_buffers = GTK_VALUE_INT (*arg);
break;
+ case ARG_BLOCK:
+ queue->block = GTK_VALUE_BOOL (*arg);
+ break;
default:
break;
}
case ARG_MAX_LEVEL:
GTK_VALUE_INT (*arg) = queue->max_buffers;
break;
+ case ARG_BLOCK:
+ GTK_VALUE_BOOL (*arg) = queue->block;
+ break;
default:
arg->type = GTK_TYPE_INVALID;
break;
gint level_buffers; /* number of buffers queued here */
gint max_buffers; /* maximum number of buffers queued here */
+ gboolean block; /* if set to FALSE, _get returns NULL if queue empty */
gint level_bytes; /* number of bytes queued here */
gint size_buffers; /* size of queue in buffers */
gint size_bytes; /* size of queue in bytes */