static void gst_asyncdisksrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_asyncdisksrc_get (GstPad *pad);
-static void gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size);
+static GstBuffer * gst_asyncdisksrc_get (GstPad *pad);
+static GstBuffer * gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size);
static GstElementStateReturn gst_asyncdisksrc_change_state (GstElement *element);
*
* Push a new buffer from the asyncdisksrc at the current offset.
*/
-static void
+static GstBuffer *
gst_asyncdisksrc_get (GstPad *pad)
{
GstAsyncDiskSrc *src;
GstBuffer *buf;
- g_return_if_fail (pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad));
- g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN));
-
+ g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL);
+
/* deal with EOF state */
if (src->curoffset >= src->size) {
gst_src_signal_eos (GST_SRC (src));
- return;
+ return NULL;
}
/* create the buffer */
// FIXME: should eventually use a bufferpool for this
buf = gst_buffer_new ();
-
- g_return_if_fail (buf != NULL);
+
+ g_return_val_if_fail (buf != NULL, NULL);
/* simply set the buffer to point to the correct region of the file */
GST_BUFFER_DATA (buf) = src->map + src->curoffset;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE);
- if ((src->curoffset + src->bytes_per_read) >
- src->size) {
+ if ((src->curoffset + src->bytes_per_read) > src->size) {
GST_BUFFER_SIZE (buf) = src->size - src->curoffset;
// FIXME: set the buffer's EOF bit here
} else
src->new_seek = FALSE;
}
- /* we're done, push the buffer off now */
- gst_pad_push (pad, buf);
+ /* we're done, return the buffer */
+ return buf;
}
/**
*
* Push a new buffer from the asyncdisksrc of given size at given offset.
*/
-static void
+static GstBuffer *
gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size)
{
GstAsyncDiskSrc *src;
GstBuffer *buf;
- g_return_if_fail (pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad));
- g_return_if_fail (GST_IS_ASYNCDISKSRC (src));
- g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN));
+ g_return_val_if_fail (GST_IS_ASYNCDISKSRC (src), NULL);
+ g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL);
/* deal with EOF state */
if (offset >= src->size) {
gst_src_signal_eos (GST_SRC (src));
- return;
+ return NULL;
}
/* create the buffer */
// FIXME: should eventually use a bufferpool for this
buf = gst_buffer_new ();
- g_return_if_fail (buf);
+ g_return_val_if_fail (buf != NULL, NULL);
/* simply set the buffer to point to the correct region of the file */
GST_BUFFER_DATA (buf) = src->map + offset;
static gboolean gst_audiosrc_open_audio (GstAudioSrc *src);
static void gst_audiosrc_sync_parms (GstAudioSrc *audiosrc);
-static void gst_audiosrc_get (GstPad *pad);
+static GstBuffer * gst_audiosrc_get (GstPad *pad);
static GstSrcClass *parent_class = NULL;
//static guint gst_audiosrc_signals[LAST_SIGNAL] = { 0 };
audiosrc->seq = 0;
}
-void gst_audiosrc_get(GstPad *pad) {
+static GstBuffer *
+gst_audiosrc_get (GstPad *pad)
+{
GstAudioSrc *src;
GstBuffer *buf;
glong readbytes;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_AUDIOSRC(gst_pad_get_parent(pad));
// g_print("attempting to read something from soundcard\n");
buf = gst_buffer_new ();
- g_return_if_fail (buf);
+ g_return_val_if_fail (buf, NULL);
GST_BUFFER_DATA (buf) = (gpointer)g_malloc (src->bytes_per_read);
-
+
readbytes = read (src->fd,GST_BUFFER_DATA (buf),
src->bytes_per_read);
if (readbytes == 0) {
gst_src_signal_eos (GST_SRC (src));
- return;
+ return NULL;
}
GST_BUFFER_SIZE (buf) = readbytes;
// gst_buffer_add_meta(buf,GST_META(newmeta));
- gst_pad_push (pad,buf);
// g_print("pushed buffer from soundcard of %d bytes\n",readbytes);
+ return buf;
}
static void
static void gst_disksrc_close_file (GstDiskSrc *src);
-static void gst_disksrc_get (GstPad *pad);
+static GstBuffer * gst_disksrc_get (GstPad *pad);
static GstElementStateReturn gst_disksrc_change_state (GstElement *element);
}
}
-static void
+static GstBuffer *
gst_disksrc_get (GstPad *pad)
{
GstDiskSrc *src;
DEBUG("pushing %d bytes with offset %d\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf));
/* we're done, push the buffer off now */
- gst_pad_push (pad, buf);
- DEBUG("pushing %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf));
+ DEBUG("returning %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf));
+ return buf;
}
enum {
ARG_0,
- /* FILL ME */
+ ARG_NUM_SOURCES,
};
-static void gst_fakesrc_class_init (GstFakeSrcClass *klass);
-static void gst_fakesrc_init (GstFakeSrc *fakesrc);
+static void gst_fakesrc_class_init (GstFakeSrcClass *klass);
+static void gst_fakesrc_init (GstFakeSrc *fakesrc);
+
+static void gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_fakesrc_get (GstPad *pad);
+static GstBuffer * gst_fakesrc_get (GstPad *pad);
static GstSrcClass *parent_class = NULL;
//static guint gst_fakesrc_signals[LAST_SIGNAL] = { 0 };
static void
gst_fakesrc_class_init (GstFakeSrcClass *klass)
{
+ GtkObjectClass *gtkobject_class;
GstSrcClass *gstsrc_class;
+ gtkobject_class = (GtkObjectClass*)klass;
gstsrc_class = (GstSrcClass*)klass;
parent_class = gtk_type_class (GST_TYPE_SRC);
+
+ gtk_object_add_arg_type ("GstFakeSrc::num_sources", GTK_TYPE_INT,
+ GTK_ARG_READWRITE, ARG_NUM_SOURCES);
+
+ gtkobject_class->set_arg = gst_fakesrc_set_arg;
+ gtkobject_class->get_arg = gst_fakesrc_get_arg;
}
static void gst_fakesrc_init(GstFakeSrc *fakesrc) {
- // create our output pad
- fakesrc->srcpad = gst_pad_new("src",GST_PAD_SRC);
- gst_pad_set_get_function(fakesrc->srcpad,gst_fakesrc_get);
- gst_element_add_pad(GST_ELEMENT(fakesrc),fakesrc->srcpad);
+ GstPad *pad;
+
+ // set the default number of
+ fakesrc->numsrcpads = 1;
+
+ // create our first output pad
+ pad = gst_pad_new("src",GST_PAD_SRC);
+ gst_pad_set_get_function(pad,gst_fakesrc_get);
+ gst_element_add_pad(GST_ELEMENT(fakesrc),pad);
+ fakesrc->srcpads = g_slist_append(NULL,pad);
// we're ready right away, since we don't have any args...
// gst_element_set_state(GST_ELEMENT(fakesrc),GST_STATE_READY);
}
-/**
- * gst_fakesrc_new:
- * @name: then name of the fakse source
- *
- * create a new fakesrc
- *
- * Returns: The new element.
- */
-GstElement *gst_fakesrc_new(gchar *name) {
- GstElement *fakesrc = GST_ELEMENT(gtk_type_new(GST_TYPE_FAKESRC));
- gst_element_set_name(GST_ELEMENT(fakesrc),name);
- return fakesrc;
+static void
+gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id)
+{
+ GstFakeSrc *src;
+ gint new_numsrcs;
+ GstPad *pad;
+
+ /* it's not null if we got it, but it might not be ours */
+ src = GST_FAKESRC (object);
+
+ switch(id) {
+ case ARG_NUM_SOURCES:
+ new_numsrcs = GTK_VALUE_INT (*arg);
+ if (new_numsrcs > src->numsrcpads) {
+ while (src->numsrcpads != new_numsrcs) {
+ pad = gst_pad_new(g_strdup_printf("src%d",src->numsrcpads),GST_PAD_SRC);
+ gst_pad_set_get_function(pad,gst_fakesrc_get);
+ gst_element_add_pad(GST_ELEMENT(src),pad);
+ src->srcpads = g_slist_append(src->srcpads,pad);
+ src->numsrcpads;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+static void
+gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id)
+{
+ GstFakeSrc *src;
+
+ /* it's not null if we got it, but it might not be ours */
+ g_return_if_fail (GST_IS_FAKESRC (object));
+
+ src = GST_FAKESRC (object);
+
+ switch (id) {
+ case ARG_NUM_SOURCES:
+ GTK_VALUE_INT (*arg) = src->numsrcpads;
+ break;
+ default:
+ arg->type = GTK_TYPE_INVALID;
+ break;
+ }
}
+
/**
* gst_fakesrc_get:
* @src: the faksesrc to get
*
* generate an empty buffer and push it to the next element.
*/
-void gst_fakesrc_get(GstPad *pad) {
+static GstBuffer *
+gst_fakesrc_get(GstPad *pad)
+{
GstFakeSrc *src;
GstBuffer *buf;
g_print("(%s:%s)> ",GST_DEBUG_PAD_NAME(pad));
buf = gst_buffer_new();
- gst_pad_push(pad,buf);
+ return buf;
}
struct _GstFakeSrc {
GstSrc src;
- GstPad *srcpad;
+ gint numsrcpads;
+ GSList *srcpads;
};
struct _GstFakeSrcClass {
};
-static void gst_fdsrc_class_init (GstFdSrcClass *klass);
-static void gst_fdsrc_init (GstFdSrc *fdsrc);
+static void gst_fdsrc_class_init (GstFdSrcClass *klass);
+static void gst_fdsrc_init (GstFdSrc *fdsrc);
-static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_fdsrc_get (GstPad *pad);
+static GstBuffer * gst_fdsrc_get (GstPad *pad);
static GstSrcClass *parent_class = NULL;
}
}
-void gst_fdsrc_get(GstPad *pad) {
+static GstBuffer *
+gst_fdsrc_get(GstPad *pad)
+{
GstFdSrc *src;
GstBuffer *buf;
glong readbytes;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_FDSRC(gst_pad_get_parent(pad));
/* create the buffer */
// FIXME: should eventually use a bufferpool for this
buf = gst_buffer_new ();
- g_return_if_fail (buf);
+ g_return_val_if_fail (buf, NULL);
/* allocate the space for the buffer data */
GST_BUFFER_DATA(buf) = g_malloc(src->bytes_per_read);
- g_return_if_fail(GST_BUFFER_DATA(buf) != NULL);
+ g_return_val_if_fail(GST_BUFFER_DATA(buf) != NULL, NULL);
/* read it in from the file */
readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read);
if (readbytes == 0) {
gst_src_signal_eos(GST_SRC(src));
- return;
+ return NULL;
}
/* if we didn't get as many bytes as we asked for, we're at EOF */
GST_BUFFER_SIZE(buf) = readbytes;
src->curoffset += readbytes;
- /* we're done, push the buffer off now */
- gst_pad_push(pad,buf);
+ /* we're done, return the buffer */
+ return buf;
}
static void gst_httpsrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
static GstElementStateReturn gst_httpsrc_change_state (GstElement *element);
-static void gst_httpsrc_get (GstPad *pad);
+static GstBuffer * gst_httpsrc_get (GstPad *pad);
static gboolean gst_httpsrc_open_url (GstHttpSrc *src);
static void gst_httpsrc_close_url (GstHttpSrc *src);
httpsrc->bytes_per_read = 4096;
}
-static void gst_httpsrc_get(GstPad *pad) {
+static GstBuffer *
+gst_httpsrc_get(GstPad *pad)
+{
GstHttpSrc *src;
GstBuffer *buf;
glong readbytes;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_HTTPSRC(gst_pad_get_parent(pad));
buf = gst_buffer_new();
if (readbytes == 0) {
gst_src_signal_eos(GST_SRC(src));
- return;
+ return NULL;
}
if (readbytes < src->bytes_per_read) {
GST_BUFFER_SIZE(buf) = readbytes;
src->curoffset += readbytes;
- gst_pad_push(pad,buf);
+ return buf;
}
static gboolean
static void
gst_queue_init (GstQueue *queue)
{
+ GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY);
+
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
- gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+ gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
- gst_pad_set_get_function (queue->srcpad, gst_queue_get);
+ gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
queue->queue = NULL;
/* we have to lock the queue since we span threads */
- DEBUG("queue: try have queue lock\n");
+// DEBUG("queue: try have queue lock\n");
GST_LOCK (queue);
- DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ());
- DEBUG("queue: have queue lock\n");
+// DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ());
+// DEBUG("queue: have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
gst_queue_flush (queue);
}
- DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
+// DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
while (queue->level_buffers >= queue->max_buffers) {
- DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
+// DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
STATUS("%s: O\n");
GST_UNLOCK (queue);
g_mutex_lock (queue->fulllock);
g_mutex_unlock (queue->fulllock);
GST_LOCK (queue);
STATUS("%s: O+\n");
- DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
+// DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
- STATUS("%s: +\n");
+// STATUS("%s: +\n");
+ g_print("(%s:%s)+ ",GST_DEBUG_PAD_NAME(pad));
/* if we were empty, but aren't any more, signal a condition */
tosignal = (queue->level_buffers >= 0);
queue->level_buffers++;
/* we can unlock now */
- DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
+// DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK (queue);
if (tosignal) {
g_mutex_lock (queue->emptylock);
- STATUS("%s: >\n");
+// STATUS("%s: >\n");
g_cond_signal (queue->emptycond);
- STATUS("%s: >>\n");
+// STATUS("%s: >>\n");
g_mutex_unlock (queue->emptylock);
}
}
name = gst_element_get_name (GST_ELEMENT (queue));
/* have to lock for thread-safety */
- DEBUG("queue: %s try have queue lock\n", name);
+// DEBUG("queue: %s try have queue lock\n", name);
GST_LOCK (queue);
- DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
- DEBUG("queue: %s have queue lock\n", name);
+// DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
+// DEBUG("queue: %s have queue lock\n", name);
while (!queue->level_buffers) {
- STATUS("queue: %s U released lock\n");
- GST_UNLOCK (queue);
+// STATUS("queue: %s U released lock\n");
+// GST_UNLOCK (queue);
g_mutex_lock (queue->emptylock);
g_cond_wait (queue->emptycond, queue->emptylock);
g_mutex_unlock (queue->emptylock);
GST_LOCK (queue);
- STATUS("queue: %s U- getting lock\n");
+// STATUS("queue: %s U- getting lock\n");
}
front = queue->queue;
g_slist_free (front);
queue->level_buffers--;
- STATUS("%s: -\n");
+// STATUS("%s: -\n");
+ g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
tosignal = queue->level_buffers < queue->max_buffers;
GST_UNLOCK(queue);
if (tosignal) {
g_mutex_lock (queue->fulllock);
- STATUS("%s: < \n");
+// STATUS("%s: < \n");
g_cond_signal (queue->fullcond);
- STATUS("%s: << \n");
+// STATUS("%s: << \n");
g_mutex_unlock (queue->fulllock);
}
- DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
+// DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
gst_pad_push (queue->srcpad, buf);
- DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
+// DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
/* unlock now */
}
//static gboolean gst_sinesrc_open_audio(GstSineSrc *src);
void gst_sinesrc_sync_parms(GstSineSrc *sinesrc);
-void gst_sinesrc_get(GstPad *pad);
+static GstBuffer * gst_sinesrc_get(GstPad *pad);
static GstSrcClass *parent_class = NULL;
//static guint gst_sinesrc_signals[LAST_SIGNAL] = { 0 };
return sinesrc;
}
-void gst_sinesrc_get(GstPad *pad) {
+static GstBuffer *
+gst_sinesrc_get(GstPad *pad)
+{
GstSineSrc *src;
GstBuffer *buf;
gint16 *samples;
gint volume;
gdouble val;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_SINESRC(gst_pad_get_parent(pad));
buf = gst_buffer_new();
- g_return_if_fail(buf);
+ g_return_val_if_fail (buf, NULL);
GST_BUFFER_DATA(buf) = (gpointer)malloc(4096);
samples = (gint16*)GST_BUFFER_DATA(buf);
GST_BUFFER_SIZE(buf) = 4096;
src->sentmeta = TRUE;
}
- gst_pad_push(pad,buf);
g_print(">");
+ return buf;
}
static void gst_sinesrc_set_arg(GtkObject *object,GtkArg *arg,guint id) {
static void
gst_bin_init (GstBin *bin)
{
+ // in general, we prefer to use cothreads for most things
+ GST_FLAG_SET (bin, GST_BIN_FLAG_PREFER_COTHREADS);
+
bin->numchildren = 0;
bin->children = NULL;
// FIXME temporary testing measure
GList *children;
GstElement *child;
+ DEBUG_ENTER("(\"%s\")",gst_element_get_name (element));
+
g_return_val_if_fail (GST_IS_BIN (element), GST_STATE_FAILURE);
bin = GST_BIN (element);
- g_print("gst_bin_change_state(\"%s\"): currently %d(%s), %d(%s) pending\n",
- gst_element_get_name (element), GST_STATE (element),
+ DEBUG("currently %d(%s), %d(%s) pending\n", GST_STATE (element),
_gst_print_statename (GST_STATE (element)), GST_STATE_PENDING (element),
_gst_print_statename (GST_STATE_PENDING (element)));
children = bin->children;
while (children) {
child = GST_ELEMENT (children->data);
- g_print("gst_bin_change_state: setting state on '%s'\n",
- gst_element_get_name (child));
+ DEBUG("setting state on '%s'\n",gst_element_get_name (child));
switch (gst_element_set_state (child, GST_STATE_PENDING (element))) {
case GST_STATE_FAILURE:
GST_STATE_PENDING (element) = GST_STATE_NONE_PENDING;
- g_print("gstbin: child '%s' failed to go to state %d(%s)\n", gst_element_get_name (child),
- GST_STATE_PENDING (element), _gst_print_statename (GST_STATE_PENDING (element)));
+ DEBUG("child '%s' failed to go to state %d(%s)\n", gst_element_get_name (child),
+ GST_STATE_PENDING (element), _gst_print_statename (GST_STATE_PENDING (element)));
return GST_STATE_FAILURE;
break;
case GST_STATE_ASYNC:
- g_print("gstbin: child '%s' is changing state asynchronously\n", gst_element_get_name (child));
+ DEBUG("child '%s' is changing state asynchronously\n", gst_element_get_name (child));
break;
}
// g_print("\n");
/**
* gst_bin_create_plan:
- * @bin: #Gstbin to create the plan for
+ * @bin: #GstBin to create the plan for
*
* let the bin figure out how to handle the plugins in it.
*/
gst_bin_loopfunc_wrapper (int argc,char *argv[])
{
GstElement *element = GST_ELEMENT (argv);
- GList *pads;
- GstPad *pad;
- GstBuffer *buf;
G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
DEBUG_ENTER("(%d,'%s')",argc,name);
-// DEBUG("entering gst_bin_loopfunc_wrapper(%d,\"%s\")\n",
-// argc,gst_element_get_name (element));
-
- if (element->loopfunc != NULL) {
- DEBUG("element %s has loop function, calling it\n", name);
+ do {
+ DEBUG("calling loopfunc %s for element %s\n",
+ GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name);
(element->loopfunc) (element);
DEBUG("element %s ended loop function\n", name);
- } else {
- DEBUG("element %s is chain-based\n", name);
- DEBUG("stepping through pads\n");
- do {
- pads = element->pads;
- while (pads) {
- pad = GST_PAD (pads->data);
- if (pad->direction == GST_PAD_SINK) {
- DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
- buf = gst_pad_pull (pad);
- DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
- (pad->chainfunc) (pad,buf);
- DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
- }
- pads = g_list_next (pads);
+ } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+ GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
+
+ DEBUG_LEAVE("(%d,'%s')",argc,name);
+ return 0;
+}
+
+static int
+gst_bin_chain_wrapper (int argc,char *argv[])
+{
+ GstElement *element = GST_ELEMENT (argv);
+ G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
+ GList *pads;
+ GstPad *pad;
+ GstBuffer *buf;
+
+ DEBUG_ENTER("(\"%s\")",name);
+ DEBUG("stepping through pads\n");
+ do {
+ pads = element->pads;
+ while (pads) {
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+ if (pad->direction == GST_PAD_SINK) {
+ DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
+ buf = gst_pad_pull (pad);
+ DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
+ (pad->chainfunc) (pad,buf);
+ DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
}
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
- }
+ }
+ } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
DEBUG_LEAVE("(%d,'%s')",argc,name);
GstElement *element = GST_ELEMENT (argv);
GList *pads;
GstPad *pad;
+ GstBuffer *buf;
G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
DEBUG_ENTER("(%d,\"%s\")",argc,name);
//gst_src_push_region (GST_SRC (element), region->offset, region->size);
if (pad->getregionfunc == NULL)
fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name);
- (pad->getregionfunc)(pad, region->offset, region->size);
+ buf = (pad->getregionfunc)(pad, region->offset, region->size);
} else {
if (pad->getfunc == NULL)
fprintf(stderr,"error, no getfunc in \"%s\"\n", name);
- (pad->getfunc)(pad);
+ buf = (pad->getfunc)(pad);
}
+
+ DEBUG("calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ gst_pad_push (pad, buf);
}
pads = g_list_next(pads);
}
}
static void
-gst_bin_pullregionfunc_proxy (GstPad *pad,
- gulong offset,
- gulong size)
-{
- region_struct region;
- cothread_state *threadstate;
-
- DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size);
-
- region.offset = offset;
- region.size = size;
-
- threadstate = GST_ELEMENT(pad->parent)->threadstate;
- cothread_set_data (threadstate, "region", ®ion);
- cothread_switch (threadstate);
- cothread_set_data (threadstate, "region", NULL);
-}
-
-static void
gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf)
{
cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate;
return buf;
}
-static void
-gst_bin_pushfunc_fake_proxy (GstPad *pad)
+static GstBuffer *
+gst_bin_chainfunc_proxy (GstPad *pad)
+{
+ GstBuffer *buf;
+}
+
+// FIXME!!!
+static void
+gst_bin_pullregionfunc_proxy (GstPad *pad,
+ gulong offset,
+ gulong size)
{
+ region_struct region;
+ cothread_state *threadstate;
+
+ DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size);
+
+ region.offset = offset;
+ region.size = size;
+
+ threadstate = GST_ELEMENT(pad->parent)->threadstate;
+ cothread_set_data (threadstate, "region", ®ion);
+ cothread_switch (threadstate);
+ cothread_set_data (threadstate, "region", NULL);
}
+
static void
gst_bin_create_plan_func (GstBin *bin)
{
+ GstElement *manager;
GList *elements;
GstElement *element;
- int sink_pads;
+ const gchar *elementname;
+ GSList *pending_bins = NULL;
+ GstBin *pending_bin;
GList *pads;
- GstPad *pad, *peer;
- GstElement *outside;
+ GstPad *pad;
+ GstElement *peer_manager;
+ cothread_func wrapper_function;
- DEBUG_SET_STRING("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
+ DEBUG_SET_STRING("(\"%s\")",gst_element_get_name (GST_ELEMENT (bin)));
DEBUG_ENTER_STRING;
- // first loop through all children to see if we need cothreads
- // we break immediately when we find we need to, why keep searching?
+ // first figure out which element is the manager of this and all child elements
+ // if we're a managing bin ourselves, that'd be us
+ if (GST_FLAG_IS_SET (bin, GST_BIN_FLAG_MANAGER)) {
+ manager = GST_ELEMENT (bin);
+ DEBUG("setting manager to self\n");
+ // otherwise, it's what our parent says it is
+ } else {
+ manager = gst_element_get_manager (GST_ELEMENT (bin));
+ DEBUG("setting manager to \"%s\"\n", gst_element_get_name (manager));
+ }
+
+ // perform the first recursive pass of plan generation
+ // we set the manager of every element but those who manage themselves
+ // the need for cothreads is also determined recursively
+ DEBUG("performing first-phase recursion\n");
+ bin->need_cothreads = bin->use_cothreads;
+ if (bin->need_cothreads)
+ DEBUG("requiring cothreads because we're forced to\n");
+
elements = bin->children;
while (elements) {
element = GST_ELEMENT (elements->data);
-
- DEBUG("found element \"%s\" in bin \"%s\"\n",
- gst_element_get_name (element),
- gst_element_get_name (GST_ELEMENT (bin)));
-
- // if it's a loop-based element, use cothreads
- if (element->loopfunc != NULL) {
- DEBUG("loop based element \"%s\" in bin \"%s\"\n",
- gst_element_get_name (element),
- gst_element_get_name (GST_ELEMENT (bin)));
-
- bin->need_cothreads = TRUE;
- DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element));
- break;
- }
-
- // if it's a complex element, use cothreads
- else if (GST_ELEMENT_IS_MULTI_IN (element)) {
- DEBUG("complex element \"%s\" in bin \"%s\"\n",
- gst_element_get_name (element),
- gst_element_get_name (GST_ELEMENT (bin)));
-
- bin->need_cothreads = TRUE;
- DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element));
- break;
+ elements = g_list_next (elements);
+#ifdef GST_DEBUG_ENABLED
+ elementname = gst_element_get_name (element);
+#endif
+ DEBUG("have element \"%s\"\n",elementname);
+
+ // first set their manager
+ DEBUG("setting manager of \"%s\" to \"%s\"\n",elementname,gst_element_get_name(manager));
+ gst_element_set_manager (element, manager);
+
+ // we do recursion and such for Bins
+ if (GST_IS_BIN (element)) {
+ // recurse into the child Bin
+ DEBUG("recursing into child Bin \"%s\"\n",elementname);
+ gst_bin_create_plan (GST_BIN (element));
+ // check to see if it needs cothreads and isn't self-managing
+ if (((GST_BIN (element))->need_cothreads) && !GST_FLAG_IS_SET(element,GST_BIN_FLAG_MANAGER)) {
+ DEBUG("requiring cothreads because child bin \"%s\" does\n",elementname);
+ bin->need_cothreads = TRUE;
+ }
+ } else {
+ // then we need to determine whether they need cothreads
+ // if it's a loop-based element, use cothreads
+ if (element->loopfunc != NULL) {
+ DEBUG("requiring cothreads because \"%s\" is a loop-based element\n",elementname);
+ bin->need_cothreads = TRUE;
+ // if it's a 'complex' element, use cothreads
+ } else if (GST_FLAG_IS_SET (element, GST_ELEMENT_COMPLEX)) {
+ DEBUG("requiring cothreads because \"%s\" is complex\n",elementname);
+ bin->need_cothreads = TRUE;
+ // if the element has more than one sink pad, use cothreads
+ } else if (element->numsinkpads > 1) {
+ DEBUG("requiring cothreads because \"%s\" has more than one sink pad\n",elementname);
+ bin->need_cothreads = TRUE;
+ }
}
+ }
- // if it has more than one input pad, use cothreads
- sink_pads = 0;
- pads = gst_element_get_pad_list (element);
- while (pads) {
- pad = GST_PAD (pads->data);
- if (pad->direction == GST_PAD_SINK)
- sink_pads++;
- pads = g_list_next (pads);
- }
- if (sink_pads > 1) {
- DEBUG("more than 1 sinkpad for element \"%s\" in bin \"%s\"\n",
- gst_element_get_name (element),
- gst_element_get_name (GST_ELEMENT (bin)));
-
- bin->need_cothreads = TRUE;
- DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element));
- break;
- }
- elements = g_list_next (elements);
+ // if we're not a manager thread, we're done.
+ if (!GST_FLAG_IS_SET (bin, GST_BIN_FLAG_MANAGER)) {
+ DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
+ return;
}
- // FIXME
-// bin->need_cothreads &= bin->use_cothreads;
- // FIXME temporary testing measure
- if (bin->use_cothreads) bin->need_cothreads = TRUE;
-
// clear previous plan state
+ g_list_free (bin->managed_elements);
+ bin->managed_elements = NULL;
+ bin->num_managed_elements = 0;
g_list_free (bin->entries);
bin->entries = NULL;
- bin->numentries = 0;
+ bin->num_entries = 0;
+
+ // find all the managed children
+ // here we pull off the trick of walking an entire arbitrary tree without recursion
+ DEBUG("attempting to find all the elements to manage\n");
+ pending_bins = g_slist_prepend (pending_bins, bin);
+ do {
+ // retrieve the top of the stack and pop it
+ pending_bin = GST_BIN (pending_bins->data);
+ pending_bins = g_slist_remove (pending_bins, pending_bin);
+
+ // walk the list of elements, find bins, and do stuff
+ DEBUG("checking Bin \"%s\" for managed elements\n",
+ gst_element_get_name (GST_ELEMENT (pending_bin)));
+ elements = pending_bin->children;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ elements = g_list_next (elements);
+#ifdef GST_DEBUG_ENABLED
+ elementname = gst_element_get_name (element);
+#endif
+
+ // if it's ours, add it to the list
+ if (element->manager == GST_ELEMENT(bin)) {
+ // if it's a Bin, add it to the list of Bins to check
+ if (GST_IS_BIN (element)) {
+ DEBUG("flattened recurse into \"%s\"\n",elementname);
+ pending_bins = g_slist_prepend (pending_bins, element);
+ // otherwise add it to the list of elements
+ } else {
+ DEBUG("found element \"%s\" that I manage\n",elementname);
+ bin->managed_elements = g_list_prepend (bin->managed_elements, element);
+ bin->num_managed_elements++;
+ }
+ }
+ }
+ } while (pending_bins);
+
+ DEBUG("have %d elements to manage, implementing plan\n",bin->num_managed_elements);
+ // If cothreads are needed, we need to not only find elements but
+ // set up cothread states and various proxy functions.
if (bin->need_cothreads) {
// first create thread context
if (bin->threadcontext == NULL) {
+ DEBUG("initializing cothread context\n");
bin->threadcontext = cothread_init ();
- DEBUG("initialized cothread context\n");
}
// walk through all the children
- elements = bin->children;
+ elements = bin->managed_elements;
while (elements) {
element = GST_ELEMENT (elements->data);
-
- // start by creating thread state for the element
- if (element->threadstate == NULL) {
- element->threadstate = cothread_create (bin->threadcontext);
- cothread_setfunc (element->threadstate, gst_bin_loopfunc_wrapper,
- 0, (char **)element);
- DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate,
- &element->threadstate,gst_element_get_name(element));
- }
-
- if (GST_IS_BIN (element)) {
- gst_bin_create_plan (GST_BIN (element));
-
- } else if (GST_IS_SRC (element)) {
- DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
- bin->entries = g_list_prepend (bin->entries,element);
- bin->numentries++;
- cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
- }
-
- pads = gst_element_get_pad_list (element);
- while (pads) {
- pad = GST_PAD(pads->data);
-
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- // set the proxy functions
- pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
- DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
- } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
- DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
- // set the proxy functions
- pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
- DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
- &pad->pullfunc,gst_bin_pullfunc_proxy);
- pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
- }
- pads = g_list_next (pads);
- }
elements = g_list_next (elements);
- // if there are no entries, we have to pick one at random
- if (bin->numentries == 0)
- bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
- }
- } else {
- DEBUG("don't need cothreads, looking for entry points\n");
- // we have to find which elements will drive an iteration
- elements = bin->children;
- while (elements) {
- element = GST_ELEMENT (elements->data);
- DEBUG("found element \"%s\"\n", gst_element_get_name (element));
- if (GST_IS_BIN (element)) {
- gst_bin_create_plan (GST_BIN (element));
- }
- if (GST_IS_SRC (element)) {
- DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
- bin->entries = g_list_prepend (bin->entries, element);
- bin->numentries++;
+ // start out with a NULL warpper function, we'll set it if we want a cothread
+ wrapper_function = NULL;
+
+ // have to decide if we need to or can use a cothreads, and if so which wrapper
+ // first of all, if there's a loopfunc, the decision's already made
+ if (element->loopfunc != NULL) {
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper);
+ DEBUG("element %s is a loopfunc, must use a cothread\n",gst_element_get_name(element));
+ } else {
+ // otherwise we need to decide if it needs a cothread
+ // if it's complex, or cothreads are preferred and it's *not* passive, cothread it
+ if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) ||
+ (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) &&
+ !GST_FLAG_IS_SET (element,GST_ELEMENT_SCHEDULE_PASSIVELY))) {
+ // base it on whether we're going to loop through source or sink pads
+ if (element->numsinkpads == 0)
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper);
+ else
+ wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper);
+ }
}
- // go through all the pads, set pointers, and check for connections
+ // walk through the all the pads for this element, setting proxy functions
+ // the selection of proxy functions depends on whether we're in a cothread or not
pads = gst_element_get_pad_list (element);
while (pads) {
pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
- if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
- DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
-
- // copy the peer's chain function, easy enough
- DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
- pad->pushfunc = pad->peer->chainfunc;
+ // check to see if someone else gets to set up the element
+ peer_manager = GST_ELEMENT((pad)->peer->parent)->manager;
+ if (peer_manager != GST_ELEMENT(bin))
+ DEBUG("WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad));
- // need to walk through and check for outside connections
-//FIXME need to do this for all pads
- /* get the pad's peer */
- peer = gst_pad_get_peer (pad);
- if (!peer) {
- DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad));
- break;
- }
- /* get the parent of the peer of the pad */
- outside = GST_ELEMENT (gst_pad_get_parent (peer));
- if (!outside) break;
- /* if it's a connection and it's not ours... */
- if (GST_IS_CONNECTION (outside) &&
- (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
- gst_info("gstbin: element \"%s\" is the external source Connection "
- "for internal element \"%s\"\n",
- gst_element_get_name (GST_ELEMENT (outside)),
- gst_element_get_name (GST_ELEMENT (element)));
- bin->entries = g_list_prepend (bin->entries, outside);
- bin->numentries++;
- }
- }
- else {
- DEBUG("found pad %s\n", gst_pad_get_name (pad));
- }
- pads = g_list_next (pads);
+ // if the wrapper_function is set, we need to use the proxy functions
+ if (wrapper_function != NULL) {
+ // set up proxy functions
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
+ } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+ DEBUG("setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
+ }
+ } else {
+ // otherwise we need to set up for 'traditional' chaining
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ // we can just copy the chain function, since it shares the prototype
+ DEBUG("copying chain function into push proxy for %s:%s\n",
+ GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = pad->chainfunc;
+ } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+ // we can just copy the get function, since it shares the prototype
+ DEBUG("copying get function into pull proxy for %s:%s\n",
+ GST_DEBUG_PAD_NAME(pad));
+ pad->pullfunc = pad->getfunc;
+ }
+ }
+ }
+ // if a loopfunc has been specified, create and set up a cothread
+ if (wrapper_function != NULL) {
+ if (element->threadstate == NULL) {
+ element->threadstate = cothread_create (bin->threadcontext);
+ DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate,
+ &element->threadstate,gst_element_get_name(element));
+ }
+ cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element);
+ DEBUG("set wrapper function for \"%s\" to &%s\n",gst_element_get_name(element),
+ GST_DEBUG_FUNCPTR_NAME(wrapper_function));
}
- elements = g_list_next (elements);
+
+// // HACK: if the element isn't passive, it's an entry
+// if (!GST_FLAG_IS_SET(element,GST_ELEMENT_SCHEDULE_PASSIVELY))
+// bin->entries = g_list_append(bin->entries, element);
}
+
+ // otherwise, cothreads are not needed
+ } else {
+ ;
}
DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
GstElement *entry;
GList *pads;
GstPad *pad;
- _GstBinOutsideSchedule *sched;
+ GstBuffer *buf;
DEBUG_SET_STRING("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin)));
DEBUG_ENTER_STRING;
// FIXME this should be lots more intelligent about where to start
DEBUG("starting iteration via cothreads\n");
- if (GST_IS_ELEMENT(bin->entries->data)) {
- entry = GST_ELEMENT (bin->entries->data);
- GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
- DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
- gst_element_get_name(entry),entry);
- cothread_switch (entry->threadstate);
- } else {
- sched = (_GstBinOutsideSchedule *) (bin->entries->data);
- sched->flags |= GST_ELEMENT_COTHREAD_STOPPING;
- DEBUG("set COTHREAD STOPPING flag on sched for \"%s\"(@%p)\n",
- gst_element_get_name(sched->element),sched->element);
- cothread_switch (sched->threadstate);
- }
+ entry = GST_ELEMENT (bin->managed_elements->data);
+ GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
+ DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
+ gst_element_get_name(entry),entry);
+ cothread_switch (entry->threadstate);
} else {
DEBUG("starting iteration via chain-functions\n");
- if (bin->numentries <= 0) {
+ if (bin->num_entries <= 0) {
//printf("gstbin: no entries in bin \"%s\" trying children...\n", gst_element_get_name(GST_ELEMENT(bin)));
// we will try loop over the elements then...
entries = bin->children;
if (pad->getfunc == NULL)
fprintf(stderr, "error, no getfunc in \"%s\"\n", gst_element_get_name (entry));
else
- (pad->getfunc)(pad);
+ buf = (pad->getfunc)(pad);
+ gst_pad_push(pad,buf);
}
pads = g_list_next (pads);
}
}
} else {
*/
+
+
+
+
+
+/*
+ } else if (GST_IS_SRC (element)) {
+ DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
+ bin->entries = g_list_prepend (bin->entries,element);
+ bin->num_entries++;
+ cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
+ }
+
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD(pads->data);
+
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ // set the proxy functions
+ pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy);
+ DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy);
+ } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
+ DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
+ // set the proxy functions
+ pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy);
+ DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n",
+ &pad->pullfunc,gst_bin_pullfunc_proxy);
+ pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy);
+ }
+ pads = g_list_next (pads);
+ }
+ elements = g_list_next (elements);
+
+ // if there are no entries, we have to pick one at random
+ if (bin->num_entries == 0)
+ bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
+ }
+ } else {
+ DEBUG("don't need cothreads, looking for entry points\n");
+ // we have to find which elements will drive an iteration
+ elements = bin->children;
+ while (elements) {
+ element = GST_ELEMENT (elements->data);
+ DEBUG("found element \"%s\"\n", gst_element_get_name (element));
+ if (GST_IS_BIN (element)) {
+ gst_bin_create_plan (GST_BIN (element));
+ }
+ if (GST_IS_SRC (element)) {
+ DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element));
+ bin->entries = g_list_prepend (bin->entries, element);
+ bin->num_entries++;
+ }
+
+ // go through all the pads, set pointers, and check for connections
+ pads = gst_element_get_pad_list (element);
+ while (pads) {
+ pad = GST_PAD (pads->data);
+
+ if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
+ DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad));
+
+ // copy the peer's chain function, easy enough
+ DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad));
+ pad->pushfunc = GST_DEBUG_FUNCPTR(pad->peer->chainfunc);
+
+ // need to walk through and check for outside connections
+//FIXME need to do this for all pads
+ // get the pad's peer
+ peer = gst_pad_get_peer (pad);
+ if (!peer) {
+ DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad));
+ break;
+ }
+ // get the parent of the peer of the pad
+ outside = GST_ELEMENT (gst_pad_get_parent (peer));
+ if (!outside) break;
+ // if it's a connection and it's not ours...
+ if (GST_IS_CONNECTION (outside) &&
+ (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
+ gst_info("gstbin: element \"%s\" is the external source Connection "
+ "for internal element \"%s\"\n",
+ gst_element_get_name (GST_ELEMENT (outside)),
+ gst_element_get_name (GST_ELEMENT (element)));
+ bin->entries = g_list_prepend (bin->entries, outside);
+ bin->num_entries++;
+ }
+ }
+ else {
+ DEBUG("found pad %s\n", gst_pad_get_name (pad));
+ }
+ pads = g_list_next (pads);
+
+ }
+ elements = g_list_next (elements);
+ }
+*/
+
#define GST_IS_BIN_CLASS(obj) \
(GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_BIN))
-#define GST_BIN_FLAG_LAST (GST_ELEMENT_FLAG_LAST + 2)
+typedef enum {
+ // this bin is a manager of child elements, i.e. a pipeline or thread
+ GST_BIN_FLAG_MANAGER = GST_ELEMENT_FLAG_LAST,
+
+ // we prefer to have cothreads when its an option, over chain-based
+ GST_BIN_FLAG_PREFER_COTHREADS,
+
+ /* padding */
+ GST_BIN_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 4,
+} GstBinFlags;
typedef struct _GstBin GstBin;
typedef struct _GstBinClass GstBinClass;
// iteration state
gboolean need_cothreads;
+ GList *managed_elements;
+ gint num_managed_elements;
GList *entries;
- gint numentries;
+ gint num_entries;
cothread_context *threadcontext;
gboolean use_cothreads;
struct _GstBinClass {
GstElementClass parent_class;
+ /* signals */
void (*object_added) (GstObject *object, GstObject *child);
/* change the state of elements of the given type */
gboolean (*change_state_type) (GstBin *bin,
GstElementState state,
GtkType type);
-
/* create a plan for the execution of the bin */
void (*create_plan) (GstBin *bin);
-
/* run a full iteration of operation */
void (*iterate) (GstBin *bin);
};
-/* this struct is used for odd scheduling cases */
-typedef struct __GstBinOutsideSchedule {
- guint32 flags;
- GstElement *element;
- GstBin *bin;
- cothread_state *threadstate;
- GSList *padlist;
-} _GstBinOutsideSchedule;
-
GtkType gst_bin_get_type (void);
#define gst_bin_destroy(bin) gst_object_destroy(GST_OBJECT(bin))
/* add and remove elements from the bin */
-void gst_bin_add (GstBin *bin, GstElement *element);
-void gst_bin_remove (GstBin *bin, GstElement *element);
+void gst_bin_add (GstBin *bin,
+ GstElement *element);
+void gst_bin_remove (GstBin *bin,
+ GstElement *element);
/* retrieve a single element or the list of children */
-GstElement* gst_bin_get_by_name (GstBin *bin, gchar *name);
+GstElement* gst_bin_get_by_name (GstBin *bin,
+ gchar *name);
GList* gst_bin_get_list (GstBin *bin);
void gst_bin_create_plan (GstBin *bin);
gboolean gst_bin_set_state_type (GstBin *bin,
- GstElementState state,
- GtkType type);
+ GstElementState state,
+ GtkType type);
void gst_bin_iterate (GstBin *bin);
// hack FIXME
-void gst_bin_use_cothreads (GstBin *bin, gboolean enabled);
+void gst_bin_use_cothreads (GstBin *bin,
+ gboolean enabled);
#ifdef __cplusplus
}
element->current_state = GST_STATE_NULL;
element->pending_state = -1;
element->numpads = 0;
+ element->numsrcpads = 0;
+ element->numsinkpads = 0;
element->pads = NULL;
element->loopfunc = NULL;
element->threadstate = NULL;
/* add it to the list */
element->pads = g_list_append (element->pads, pad);
element->numpads++;
+ if (gst_pad_get_direction (pad) == GST_PAD_SRC)
+ element->numsrcpads++;
+ else
+ element->numsinkpads++;
/* emit the NEW_PAD signal */
// g_print("emitting NEW_PAD signal, \"%s\"!\n",gst_pad_get_name(pad));
(GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_ELEMENT))
typedef enum {
- GST_ELEMENT_MULTI_IN = GST_OBJECT_FLAG_LAST,
+ // element is complex (for some def.) and generally require a cothread
+ GST_ELEMENT_COMPLEX = GST_OBJECT_FLAG_LAST,
+ // not to be scheduled directly, let others trigger all events
+ GST_ELEMENT_SCHEDULE_PASSIVELY,
+ // this element should be placed in a thread if at all possible
GST_ELEMENT_THREAD_SUGGESTED,
+ // this element is incable of seeking (FIXME: does this apply to filters?)
GST_ELEMENT_NO_SEEK,
+ // there is a new loopfunction ready for placement
GST_ELEMENT_NEW_LOOPFUNC,
+ // the cothread holding this element needs to be stopped
GST_ELEMENT_COTHREAD_STOPPING,
/* use some padding for future expansion */
GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 8,
} GstElementFlags;
-#define GST_ELEMENT_IS_MULTI_IN(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_MULTI_IN))
#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
#define GST_ELEMENT_IS_COTHREAD_STOPPING(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING))
cothread_state *threadstate;
guint16 numpads;
+ guint16 numsrcpads;
+ guint16 numsinkpads;
GList *pads;
GstElement *manager;
* pad is the sink pad (so the same chain function can be used for N pads)
* buf is the buffer being passed */
typedef void (*GstPadChainFunction) (GstPad *pad,GstBuffer *buf);
-typedef void (*GstPadGetFunction) (GstPad *pad);
-typedef void (*GstPadGetRegionFunction) (GstPad *pad, gulong offset, gulong size);
+typedef GstBuffer *(*GstPadGetFunction) (GstPad *pad);
+typedef GstBuffer *(*GstPadGetRegionFunction) (GstPad *pad, gulong offset, gulong size);
typedef void (*GstPadQoSFunction) (GstPad *pad, glong qos_message);
typedef void (*GstPadPushFunction) (GstPad *pad, GstBuffer *buf);
static void
gst_pipeline_init (GstPipeline *pipeline)
{
+ // we're a manager by default
+ GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER);
+
pipeline->src = NULL;
pipeline->sinks = NULL;
}
base_factories = g_new0(GList *, numsinks);
i = 0;
- // fase 2, loop over all the sinks..
+ // fase 2, loop over all the sinks..
while (elements) {
GList *pads;
GstPad *pad;
static void
gst_thread_init (GstThread *thread)
{
+ // we're a manager by default
+ GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
+
+ // default is to create a thread
GST_FLAG_SET (thread, GST_THREAD_CREATE);
thread->lock = g_mutex_new();
static void gst_asyncdisksrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_asyncdisksrc_get (GstPad *pad);
-static void gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size);
+static GstBuffer * gst_asyncdisksrc_get (GstPad *pad);
+static GstBuffer * gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size);
static GstElementStateReturn gst_asyncdisksrc_change_state (GstElement *element);
*
* Push a new buffer from the asyncdisksrc at the current offset.
*/
-static void
+static GstBuffer *
gst_asyncdisksrc_get (GstPad *pad)
{
GstAsyncDiskSrc *src;
GstBuffer *buf;
- g_return_if_fail (pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad));
- g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN));
-
+ g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL);
+
/* deal with EOF state */
if (src->curoffset >= src->size) {
gst_src_signal_eos (GST_SRC (src));
- return;
+ return NULL;
}
/* create the buffer */
// FIXME: should eventually use a bufferpool for this
buf = gst_buffer_new ();
-
- g_return_if_fail (buf != NULL);
+
+ g_return_val_if_fail (buf != NULL, NULL);
/* simply set the buffer to point to the correct region of the file */
GST_BUFFER_DATA (buf) = src->map + src->curoffset;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE);
- if ((src->curoffset + src->bytes_per_read) >
- src->size) {
+ if ((src->curoffset + src->bytes_per_read) > src->size) {
GST_BUFFER_SIZE (buf) = src->size - src->curoffset;
// FIXME: set the buffer's EOF bit here
} else
src->new_seek = FALSE;
}
- /* we're done, push the buffer off now */
- gst_pad_push (pad, buf);
+ /* we're done, return the buffer */
+ return buf;
}
/**
*
* Push a new buffer from the asyncdisksrc of given size at given offset.
*/
-static void
+static GstBuffer *
gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size)
{
GstAsyncDiskSrc *src;
GstBuffer *buf;
- g_return_if_fail (pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad));
- g_return_if_fail (GST_IS_ASYNCDISKSRC (src));
- g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN));
+ g_return_val_if_fail (GST_IS_ASYNCDISKSRC (src), NULL);
+ g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL);
/* deal with EOF state */
if (offset >= src->size) {
gst_src_signal_eos (GST_SRC (src));
- return;
+ return NULL;
}
/* create the buffer */
// FIXME: should eventually use a bufferpool for this
buf = gst_buffer_new ();
- g_return_if_fail (buf);
+ g_return_val_if_fail (buf != NULL, NULL);
/* simply set the buffer to point to the correct region of the file */
GST_BUFFER_DATA (buf) = src->map + offset;
static gboolean gst_audiosrc_open_audio (GstAudioSrc *src);
static void gst_audiosrc_sync_parms (GstAudioSrc *audiosrc);
-static void gst_audiosrc_get (GstPad *pad);
+static GstBuffer * gst_audiosrc_get (GstPad *pad);
static GstSrcClass *parent_class = NULL;
//static guint gst_audiosrc_signals[LAST_SIGNAL] = { 0 };
audiosrc->seq = 0;
}
-void gst_audiosrc_get(GstPad *pad) {
+static GstBuffer *
+gst_audiosrc_get (GstPad *pad)
+{
GstAudioSrc *src;
GstBuffer *buf;
glong readbytes;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_AUDIOSRC(gst_pad_get_parent(pad));
// g_print("attempting to read something from soundcard\n");
buf = gst_buffer_new ();
- g_return_if_fail (buf);
+ g_return_val_if_fail (buf, NULL);
GST_BUFFER_DATA (buf) = (gpointer)g_malloc (src->bytes_per_read);
-
+
readbytes = read (src->fd,GST_BUFFER_DATA (buf),
src->bytes_per_read);
if (readbytes == 0) {
gst_src_signal_eos (GST_SRC (src));
- return;
+ return NULL;
}
GST_BUFFER_SIZE (buf) = readbytes;
// gst_buffer_add_meta(buf,GST_META(newmeta));
- gst_pad_push (pad,buf);
// g_print("pushed buffer from soundcard of %d bytes\n",readbytes);
+ return buf;
}
static void
static void gst_disksrc_close_file (GstDiskSrc *src);
-static void gst_disksrc_get (GstPad *pad);
+static GstBuffer * gst_disksrc_get (GstPad *pad);
static GstElementStateReturn gst_disksrc_change_state (GstElement *element);
}
}
-static void
+static GstBuffer *
gst_disksrc_get (GstPad *pad)
{
GstDiskSrc *src;
DEBUG("pushing %d bytes with offset %d\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf));
/* we're done, push the buffer off now */
- gst_pad_push (pad, buf);
- DEBUG("pushing %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf));
+ DEBUG("returning %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf));
+ return buf;
}
enum {
ARG_0,
- /* FILL ME */
+ ARG_NUM_SOURCES,
};
-static void gst_fakesrc_class_init (GstFakeSrcClass *klass);
-static void gst_fakesrc_init (GstFakeSrc *fakesrc);
+static void gst_fakesrc_class_init (GstFakeSrcClass *klass);
+static void gst_fakesrc_init (GstFakeSrc *fakesrc);
+
+static void gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_fakesrc_get (GstPad *pad);
+static GstBuffer * gst_fakesrc_get (GstPad *pad);
static GstSrcClass *parent_class = NULL;
//static guint gst_fakesrc_signals[LAST_SIGNAL] = { 0 };
static void
gst_fakesrc_class_init (GstFakeSrcClass *klass)
{
+ GtkObjectClass *gtkobject_class;
GstSrcClass *gstsrc_class;
+ gtkobject_class = (GtkObjectClass*)klass;
gstsrc_class = (GstSrcClass*)klass;
parent_class = gtk_type_class (GST_TYPE_SRC);
+
+ gtk_object_add_arg_type ("GstFakeSrc::num_sources", GTK_TYPE_INT,
+ GTK_ARG_READWRITE, ARG_NUM_SOURCES);
+
+ gtkobject_class->set_arg = gst_fakesrc_set_arg;
+ gtkobject_class->get_arg = gst_fakesrc_get_arg;
}
static void gst_fakesrc_init(GstFakeSrc *fakesrc) {
- // create our output pad
- fakesrc->srcpad = gst_pad_new("src",GST_PAD_SRC);
- gst_pad_set_get_function(fakesrc->srcpad,gst_fakesrc_get);
- gst_element_add_pad(GST_ELEMENT(fakesrc),fakesrc->srcpad);
+ GstPad *pad;
+
+ // set the default number of
+ fakesrc->numsrcpads = 1;
+
+ // create our first output pad
+ pad = gst_pad_new("src",GST_PAD_SRC);
+ gst_pad_set_get_function(pad,gst_fakesrc_get);
+ gst_element_add_pad(GST_ELEMENT(fakesrc),pad);
+ fakesrc->srcpads = g_slist_append(NULL,pad);
// we're ready right away, since we don't have any args...
// gst_element_set_state(GST_ELEMENT(fakesrc),GST_STATE_READY);
}
-/**
- * gst_fakesrc_new:
- * @name: then name of the fakse source
- *
- * create a new fakesrc
- *
- * Returns: The new element.
- */
-GstElement *gst_fakesrc_new(gchar *name) {
- GstElement *fakesrc = GST_ELEMENT(gtk_type_new(GST_TYPE_FAKESRC));
- gst_element_set_name(GST_ELEMENT(fakesrc),name);
- return fakesrc;
+static void
+gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id)
+{
+ GstFakeSrc *src;
+ gint new_numsrcs;
+ GstPad *pad;
+
+ /* it's not null if we got it, but it might not be ours */
+ src = GST_FAKESRC (object);
+
+ switch(id) {
+ case ARG_NUM_SOURCES:
+ new_numsrcs = GTK_VALUE_INT (*arg);
+ if (new_numsrcs > src->numsrcpads) {
+ while (src->numsrcpads != new_numsrcs) {
+ pad = gst_pad_new(g_strdup_printf("src%d",src->numsrcpads),GST_PAD_SRC);
+ gst_pad_set_get_function(pad,gst_fakesrc_get);
+ gst_element_add_pad(GST_ELEMENT(src),pad);
+ src->srcpads = g_slist_append(src->srcpads,pad);
+ src->numsrcpads;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+static void
+gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id)
+{
+ GstFakeSrc *src;
+
+ /* it's not null if we got it, but it might not be ours */
+ g_return_if_fail (GST_IS_FAKESRC (object));
+
+ src = GST_FAKESRC (object);
+
+ switch (id) {
+ case ARG_NUM_SOURCES:
+ GTK_VALUE_INT (*arg) = src->numsrcpads;
+ break;
+ default:
+ arg->type = GTK_TYPE_INVALID;
+ break;
+ }
}
+
/**
* gst_fakesrc_get:
* @src: the faksesrc to get
*
* generate an empty buffer and push it to the next element.
*/
-void gst_fakesrc_get(GstPad *pad) {
+static GstBuffer *
+gst_fakesrc_get(GstPad *pad)
+{
GstFakeSrc *src;
GstBuffer *buf;
g_print("(%s:%s)> ",GST_DEBUG_PAD_NAME(pad));
buf = gst_buffer_new();
- gst_pad_push(pad,buf);
+ return buf;
}
struct _GstFakeSrc {
GstSrc src;
- GstPad *srcpad;
+ gint numsrcpads;
+ GSList *srcpads;
};
struct _GstFakeSrcClass {
};
-static void gst_fdsrc_class_init (GstFdSrcClass *klass);
-static void gst_fdsrc_init (GstFdSrc *fdsrc);
+static void gst_fdsrc_class_init (GstFdSrcClass *klass);
+static void gst_fdsrc_init (GstFdSrc *fdsrc);
-static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_fdsrc_get (GstPad *pad);
+static GstBuffer * gst_fdsrc_get (GstPad *pad);
static GstSrcClass *parent_class = NULL;
}
}
-void gst_fdsrc_get(GstPad *pad) {
+static GstBuffer *
+gst_fdsrc_get(GstPad *pad)
+{
GstFdSrc *src;
GstBuffer *buf;
glong readbytes;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_FDSRC(gst_pad_get_parent(pad));
/* create the buffer */
// FIXME: should eventually use a bufferpool for this
buf = gst_buffer_new ();
- g_return_if_fail (buf);
+ g_return_val_if_fail (buf, NULL);
/* allocate the space for the buffer data */
GST_BUFFER_DATA(buf) = g_malloc(src->bytes_per_read);
- g_return_if_fail(GST_BUFFER_DATA(buf) != NULL);
+ g_return_val_if_fail(GST_BUFFER_DATA(buf) != NULL, NULL);
/* read it in from the file */
readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read);
if (readbytes == 0) {
gst_src_signal_eos(GST_SRC(src));
- return;
+ return NULL;
}
/* if we didn't get as many bytes as we asked for, we're at EOF */
GST_BUFFER_SIZE(buf) = readbytes;
src->curoffset += readbytes;
- /* we're done, push the buffer off now */
- gst_pad_push(pad,buf);
+ /* we're done, return the buffer */
+ return buf;
}
static void gst_httpsrc_get_arg (GtkObject *object, GtkArg *arg, guint id);
static GstElementStateReturn gst_httpsrc_change_state (GstElement *element);
-static void gst_httpsrc_get (GstPad *pad);
+static GstBuffer * gst_httpsrc_get (GstPad *pad);
static gboolean gst_httpsrc_open_url (GstHttpSrc *src);
static void gst_httpsrc_close_url (GstHttpSrc *src);
httpsrc->bytes_per_read = 4096;
}
-static void gst_httpsrc_get(GstPad *pad) {
+static GstBuffer *
+gst_httpsrc_get(GstPad *pad)
+{
GstHttpSrc *src;
GstBuffer *buf;
glong readbytes;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_HTTPSRC(gst_pad_get_parent(pad));
buf = gst_buffer_new();
if (readbytes == 0) {
gst_src_signal_eos(GST_SRC(src));
- return;
+ return NULL;
}
if (readbytes < src->bytes_per_read) {
GST_BUFFER_SIZE(buf) = readbytes;
src->curoffset += readbytes;
- gst_pad_push(pad,buf);
+ return buf;
}
static gboolean
static void
gst_queue_init (GstQueue *queue)
{
+ GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY);
+
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
- gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
+ gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
- gst_pad_set_get_function (queue->srcpad, gst_queue_get);
+ gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
queue->queue = NULL;
/* we have to lock the queue since we span threads */
- DEBUG("queue: try have queue lock\n");
+// DEBUG("queue: try have queue lock\n");
GST_LOCK (queue);
- DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ());
- DEBUG("queue: have queue lock\n");
+// DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ());
+// DEBUG("queue: have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
gst_queue_flush (queue);
}
- DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
+// DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
while (queue->level_buffers >= queue->max_buffers) {
- DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
+// DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
STATUS("%s: O\n");
GST_UNLOCK (queue);
g_mutex_lock (queue->fulllock);
g_mutex_unlock (queue->fulllock);
GST_LOCK (queue);
STATUS("%s: O+\n");
- DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
+// DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
- STATUS("%s: +\n");
+// STATUS("%s: +\n");
+ g_print("(%s:%s)+ ",GST_DEBUG_PAD_NAME(pad));
/* if we were empty, but aren't any more, signal a condition */
tosignal = (queue->level_buffers >= 0);
queue->level_buffers++;
/* we can unlock now */
- DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
+// DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK (queue);
if (tosignal) {
g_mutex_lock (queue->emptylock);
- STATUS("%s: >\n");
+// STATUS("%s: >\n");
g_cond_signal (queue->emptycond);
- STATUS("%s: >>\n");
+// STATUS("%s: >>\n");
g_mutex_unlock (queue->emptylock);
}
}
name = gst_element_get_name (GST_ELEMENT (queue));
/* have to lock for thread-safety */
- DEBUG("queue: %s try have queue lock\n", name);
+// DEBUG("queue: %s try have queue lock\n", name);
GST_LOCK (queue);
- DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
- DEBUG("queue: %s have queue lock\n", name);
+// DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
+// DEBUG("queue: %s have queue lock\n", name);
while (!queue->level_buffers) {
- STATUS("queue: %s U released lock\n");
- GST_UNLOCK (queue);
+// STATUS("queue: %s U released lock\n");
+// GST_UNLOCK (queue);
g_mutex_lock (queue->emptylock);
g_cond_wait (queue->emptycond, queue->emptylock);
g_mutex_unlock (queue->emptylock);
GST_LOCK (queue);
- STATUS("queue: %s U- getting lock\n");
+// STATUS("queue: %s U- getting lock\n");
}
front = queue->queue;
g_slist_free (front);
queue->level_buffers--;
- STATUS("%s: -\n");
+// STATUS("%s: -\n");
+ g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
tosignal = queue->level_buffers < queue->max_buffers;
GST_UNLOCK(queue);
if (tosignal) {
g_mutex_lock (queue->fulllock);
- STATUS("%s: < \n");
+// STATUS("%s: < \n");
g_cond_signal (queue->fullcond);
- STATUS("%s: << \n");
+// STATUS("%s: << \n");
g_mutex_unlock (queue->fulllock);
}
- DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
+// DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
gst_pad_push (queue->srcpad, buf);
- DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
+// DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
/* unlock now */
}
//static gboolean gst_sinesrc_open_audio(GstSineSrc *src);
void gst_sinesrc_sync_parms(GstSineSrc *sinesrc);
-void gst_sinesrc_get(GstPad *pad);
+static GstBuffer * gst_sinesrc_get(GstPad *pad);
static GstSrcClass *parent_class = NULL;
//static guint gst_sinesrc_signals[LAST_SIGNAL] = { 0 };
return sinesrc;
}
-void gst_sinesrc_get(GstPad *pad) {
+static GstBuffer *
+gst_sinesrc_get(GstPad *pad)
+{
GstSineSrc *src;
GstBuffer *buf;
gint16 *samples;
gint volume;
gdouble val;
- g_return_if_fail(pad != NULL);
+ g_return_val_if_fail (pad != NULL, NULL);
src = GST_SINESRC(gst_pad_get_parent(pad));
buf = gst_buffer_new();
- g_return_if_fail(buf);
+ g_return_val_if_fail (buf, NULL);
GST_BUFFER_DATA(buf) = (gpointer)malloc(4096);
samples = (gint16*)GST_BUFFER_DATA(buf);
GST_BUFFER_SIZE(buf) = 4096;
src->sentmeta = TRUE;
}
- gst_pad_push(pad,buf);
g_print(">");
+ return buf;
}
static void gst_sinesrc_set_arg(GtkObject *object,GtkArg *arg,guint id) {