From 04c360e39ffcc9d408be1e3ca3bcb520ba6bc4ce Mon Sep 17 00:00:00 2001 From: Erik Walthinsen Date: Mon, 4 Dec 2000 10:52:30 +0000 Subject: [PATCH] Changed the way things are scheduled, especially sources. A Src used to have a push() function, and optionally a pus... Original commit message from CVS: Changed the way things are scheduled, especially sources. A Src used to have a push() function, and optionally a pushregion() to deal with async reads, etc. That whole thing has gone away, in favor of providing a pull() function for the output (Src) pad instead, ala chain functions. This makes constructing cothreaded schedules out of non-loop elements somewhat easier. Basically there was always a question as to which pad was being dealt with. In the pullregion case, cothread-specific data was used to try to pass the region struct to the right place, which is a slow hack. And in general, the push function severely limited the kind of tricks that could be played when there's more than one output pad, such as a multi-out file reader with async capabilities on each pad independently. This changes the way cothread scheduling occurs. Instead of the hack to deal with Src's by calling their push() function (or optionally the pushregion(), in certain cases), we now are working towards a general mechanism where pads are the only thing that are dealt with directly. An optimization was made in the process of doing this: the loopfunction actually run as the outer [stack] frame of the cothread is now set more intelligently in create_plan() based on what kind of element it is. We now have: loopfunc_wrapper: used for loop-based elements, it simply calls the loopfunc in a loop, paying attention to COTHREAD_STOPPING (see below). It currently does other, soon to be depracated, stuff. pullsrc_wrapper: wraps a Src that's not loop-based (since your options are now loop- or pull-based) There will be a couple more to deal with other cases, such as Connections and chain-based elements. The general idea is that it's a lot more efficient to make the decisions once in create_plan than to keep doing this huge if/else chain in the wrapper. Just choose the right wrapper up front. It'll be most apparent performance-wise in the case of whichever element context is switched to first for each iteration, since the whole wrapper setup is done for every iteration. The tricky part is that there is now a bit of overloading of the function pointers in a pad. The current meanings (possibly to change a bit more soon) are: chainfunc: as always, chainfunc pointer is mirrored between peer pads (this may change, and the chain func may end up in pushfunc) pushfunc: SrcPad: gst_pad_pushfunc_proxy, cothread_switch to peer SinkPad: none (may take over chainfunc, see below) pullfunc: SrcPad: Src or Connection's function to construct buffers SinkPad: gst_pad_pullfunc_proxy, cothread_switch to peer There are a number of issues remaining with the scheduling, not the least of which is the fact that Connections are still dealt with the old way, with _push() functions and such. I'm trying to figure out a way to unify the system so it makes sense. Following the scheduling system is hard enough, trying to change it is murder. Another useful scheduling addition, mentioned above, is COTHREAD_STOPPING. It's an element flag that's used to signal whatever code is running in cothread context that it should be finishing up and exiting soon. An example of this is in plugins/cobin/spindentity.c. All the loops should now be composed of do/while loops, rather than while(1) loops: do { buf = gst_pad_pull(spindentity->sinkpad); gst_pad_push(spindentity->srcpad,buf); } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); The reason for this is that COTHREAD_STOPPING may be set before the above loop ever gets started. It wouldn't do for the body of the loop to never once get called, that would simply stall the pipeline. Note that only the core library code is ever responsible for setting and unsetting this flag. All elements have to do is respond to it by cleanly exiting the loop and the function holding it. This is needed primarily to allow iterations to occur properly. Basically, there's a single entry point in the cothread scheduling loop, gst_bin_iterate_func() simply switches to this cothread. If the element in this context is allowed to loop infinitely, nothing would even switch back to the context from which the iterate() was originally called. This is a bit of a problem. The solution is for there to be an implicit switch back to the originating context. Now, even I'm not sure exactly how this works, but if the cothread that's switched to actually returns, execution returns back to the calling context, i.e. iterate_func(). COTHREAD_STOPPING is therefore set just before switching into this (currently randomly chosen) context, on the assumption that it will return promptly after finishing its duties. The burden of clearing the flag falls to the various wrapper functions provided by the Bin code, thus element writers don't have to worry about doing that at all (and simply shouldn't). Related changes: All the sources in elements/ have been changed to reflect the new system. FIXMEs: 1) gstpipeline.c calls gst_src_push at some point, dunno why, it's commented out now. 2) any other sources, including vcdsrc, dvdsrc, and v4lsrc will break badly and need to be modified to work as pull-based sources. --- gst/elements/gstasyncdisksrc.c | 72 ++++++++-------- gst/elements/gstaudiosrc.c | 32 ++++--- gst/elements/gstdisksrc.c | 35 ++++---- gst/elements/gstfakesrc.c | 53 +++++++----- gst/elements/gstfdsrc.c | 48 ++++------- gst/elements/gsthttpsrc.c | 55 +++++------- gst/elements/gstsinesrc.c | 26 +++--- gst/gstbin.c | 167 ++++++++++++++++++++++--------------- gst/gstelement.h | 6 +- gst/gstpad.c | 16 ++-- gst/gstpad.h | 7 +- gst/gstpipeline.c | 6 +- gst/gstsrc.c | 45 ---------- gst/gstsrc.h | 15 +--- plugins/elements/gstasyncdisksrc.c | 72 ++++++++-------- plugins/elements/gstaudiosrc.c | 32 ++++--- plugins/elements/gstdisksrc.c | 35 ++++---- plugins/elements/gstfakesrc.c | 53 +++++++----- plugins/elements/gstfdsrc.c | 48 ++++------- plugins/elements/gsthttpsrc.c | 55 +++++------- plugins/elements/gstsinesrc.c | 26 +++--- 21 files changed, 417 insertions(+), 487 deletions(-) diff --git a/gst/elements/gstasyncdisksrc.c b/gst/elements/gstasyncdisksrc.c index 57cc0dc..6de1348 100644 --- a/gst/elements/gstasyncdisksrc.c +++ b/gst/elements/gstasyncdisksrc.c @@ -57,8 +57,8 @@ static void gst_asyncdisksrc_init (GstAsyncDiskSrc *asyncdisksrc); static void gst_asyncdisksrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_asyncdisksrc_push (GstSrc *src); -static void gst_asyncdisksrc_push_region (GstSrc *src, gulong offset, gulong size); +static void gst_asyncdisksrc_pull (GstPad *pad); +static void gst_asyncdisksrc_pull_region (GstPad *pad, gulong offset, gulong size); static GstElementStateReturn gst_asyncdisksrc_change_state (GstElement *element); @@ -113,9 +113,6 @@ gst_asyncdisksrc_class_init (GstAsyncDiskSrcClass *klass) gtkobject_class->get_arg = gst_asyncdisksrc_get_arg; gstelement_class->change_state = gst_asyncdisksrc_change_state; - - gstsrc_class->push = gst_asyncdisksrc_push; - gstsrc_class->push_region = gst_asyncdisksrc_push_region; } static void @@ -124,6 +121,8 @@ gst_asyncdisksrc_init (GstAsyncDiskSrc *asyncdisksrc) GST_SRC_SET_FLAGS (asyncdisksrc, GST_SRC_ASYNC); asyncdisksrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function (asyncdisksrc->srcpad,gst_asyncdisksrc_pull); + // FIXME must set pullregion gst_element_add_pad (GST_ELEMENT (asyncdisksrc), asyncdisksrc->srcpad); asyncdisksrc->filename = NULL; @@ -204,26 +203,24 @@ gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } /** - * gst_asyncdisksrc_push: - * @src: #GstSrc to push a buffer from + * gst_asyncdisksrc_pull: + * @pad: #GstPad to push a buffer from * * Push a new buffer from the asyncdisksrc at the current offset. */ static void -gst_asyncdisksrc_push (GstSrc *src) +gst_asyncdisksrc_pull (GstPad *pad) { - GstAsyncDiskSrc *asyncdisksrc; + GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_ASYNCDISKSRC (src)); + g_return_if_fail (pad != NULL); + src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); - asyncdisksrc = GST_ASYNCDISKSRC (src); - /* deal with EOF state */ - if (asyncdisksrc->curoffset >= asyncdisksrc->size) { - gst_src_signal_eos (GST_SRC (asyncdisksrc)); + if (src->curoffset >= src->size) { + gst_src_signal_eos (GST_SRC (src)); return; } @@ -234,30 +231,30 @@ gst_asyncdisksrc_push (GstSrc *src) g_return_if_fail (buf != NULL); /* simply set the buffer to point to the correct region of the file */ - GST_BUFFER_DATA (buf) = asyncdisksrc->map + asyncdisksrc->curoffset; - GST_BUFFER_OFFSET (buf) = asyncdisksrc->curoffset; + GST_BUFFER_DATA (buf) = src->map + src->curoffset; + GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE); - if ((asyncdisksrc->curoffset + asyncdisksrc->bytes_per_read) > - asyncdisksrc->size) { - GST_BUFFER_SIZE (buf) = asyncdisksrc->size - asyncdisksrc->curoffset; + if ((src->curoffset + src->bytes_per_read) > + src->size) { + GST_BUFFER_SIZE (buf) = src->size - src->curoffset; // FIXME: set the buffer's EOF bit here } else - GST_BUFFER_SIZE (buf) = asyncdisksrc->bytes_per_read; + GST_BUFFER_SIZE (buf) = src->bytes_per_read; - asyncdisksrc->curoffset += GST_BUFFER_SIZE (buf); + src->curoffset += GST_BUFFER_SIZE (buf); - if (asyncdisksrc->new_seek) { + if (src->new_seek) { GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLUSH); - asyncdisksrc->new_seek = FALSE; + src->new_seek = FALSE; } /* we're done, push the buffer off now */ - gst_pad_push (asyncdisksrc->srcpad, buf); + gst_pad_push (pad, buf); } /** - * gst_asyncdisksrc_push_region: + * gst_asyncdisksrc_pull_region: * @src: #GstSrc to push a buffer from * @offset: offset in file * @size: number of bytes @@ -265,20 +262,21 @@ gst_asyncdisksrc_push (GstSrc *src) * Push a new buffer from the asyncdisksrc of given size at given offset. */ static void -gst_asyncdisksrc_push_region (GstSrc *src, gulong offset, gulong size) +gst_asyncdisksrc_pull_region (GstPad *pad, gulong offset, gulong size) { - GstAsyncDiskSrc *asyncdisksrc; + GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (src != NULL); + g_return_if_fail (pad != NULL); + + src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); + g_return_if_fail (GST_IS_ASYNCDISKSRC (src)); g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); - asyncdisksrc = GST_ASYNCDISKSRC (src); - /* deal with EOF state */ - if (offset >= asyncdisksrc->size) { - gst_src_signal_eos (GST_SRC (asyncdisksrc)); + if (offset >= src->size) { + gst_src_signal_eos (GST_SRC (src)); return; } @@ -288,18 +286,18 @@ gst_asyncdisksrc_push_region (GstSrc *src, gulong offset, gulong size) g_return_if_fail (buf); /* simply set the buffer to point to the correct region of the file */ - GST_BUFFER_DATA (buf) = asyncdisksrc->map + offset; + GST_BUFFER_DATA (buf) = src->map + offset; GST_BUFFER_OFFSET (buf) = offset; GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE); - if ((offset + size) > asyncdisksrc->size) { - GST_BUFFER_SIZE (buf) = asyncdisksrc->size - offset; + if ((offset + size) > src->size) { + GST_BUFFER_SIZE (buf) = src->size - offset; // FIXME: set the buffer's EOF bit here } else GST_BUFFER_SIZE (buf) = size; /* we're done, push the buffer off now */ - gst_pad_push (asyncdisksrc->srcpad,buf); + gst_pad_push (pad,buf); } diff --git a/gst/elements/gstaudiosrc.c b/gst/elements/gstaudiosrc.c index ee50978..6e89a77 100644 --- a/gst/elements/gstaudiosrc.c +++ b/gst/elements/gstaudiosrc.c @@ -64,7 +64,7 @@ static void gst_audiosrc_close_audio (GstAudioSrc *src); static gboolean gst_audiosrc_open_audio (GstAudioSrc *src); static void gst_audiosrc_sync_parms (GstAudioSrc *audiosrc); -static void gst_audiosrc_push (GstSrc *src); +static void gst_audiosrc_pull (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_audiosrc_signals[LAST_SIGNAL] = { 0 }; @@ -118,14 +118,13 @@ gst_audiosrc_class_init (GstAudioSrcClass *klass) gtkobject_class->get_arg = gst_audiosrc_get_arg; gstelement_class->change_state = gst_audiosrc_change_state; - - gstsrc_class->push = gst_audiosrc_push; } static void gst_audiosrc_init (GstAudioSrc *audiosrc) { audiosrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function(audiosrc->srcpad,gst_audiosrc_pull); gst_element_add_pad (GST_ELEMENT (audiosrc), audiosrc->srcpad); audiosrc->fd = -1; @@ -141,40 +140,37 @@ gst_audiosrc_init (GstAudioSrc *audiosrc) audiosrc->seq = 0; } -static void -gst_audiosrc_push (GstSrc *src) -{ - GstAudioSrc *audiosrc; +void gst_audiosrc_pull(GstPad *pad) { + GstAudioSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_AUDIOSRC (src)); - audiosrc = GST_AUDIOSRC (src); + g_return_if_fail(pad != NULL); + src = GST_AUDIOSRC(gst_pad_get_parent(pad)); // g_print("attempting to read something from soundcard\n"); buf = gst_buffer_new (); g_return_if_fail (buf); - GST_BUFFER_DATA (buf) = (gpointer)g_malloc (audiosrc->bytes_per_read); - - readbytes = read (audiosrc->fd,GST_BUFFER_DATA (buf), - audiosrc->bytes_per_read); + GST_BUFFER_DATA (buf) = (gpointer)g_malloc (src->bytes_per_read); + readbytes = read (src->fd,GST_BUFFER_DATA (buf), + src->bytes_per_read); + if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (audiosrc)); + gst_src_signal_eos (GST_SRC (src)); return; } GST_BUFFER_SIZE (buf) = readbytes; - GST_BUFFER_OFFSET (buf) = audiosrc->curoffset; + GST_BUFFER_OFFSET (buf) = src->curoffset; - audiosrc->curoffset += readbytes; + src->curoffset += readbytes; // gst_buffer_add_meta(buf,GST_META(newmeta)); - gst_pad_push (audiosrc->srcpad,buf); + gst_pad_push (pad,buf); // g_print("pushed buffer from soundcard of %d bytes\n",readbytes); } diff --git a/gst/elements/gstdisksrc.c b/gst/elements/gstdisksrc.c index 171ec35..e985df3 100644 --- a/gst/elements/gstdisksrc.c +++ b/gst/elements/gstdisksrc.c @@ -60,7 +60,7 @@ static void gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_disksrc_close_file (GstDiskSrc *src); -static void gst_disksrc_push (GstSrc *src); +static void gst_disksrc_pull (GstPad *pad); static GstElementStateReturn gst_disksrc_change_state (GstElement *element); @@ -114,16 +114,13 @@ gst_disksrc_class_init (GstDiskSrcClass *klass) gtkobject_class->get_arg = gst_disksrc_get_arg; gstelement_class->change_state = gst_disksrc_change_state; - - gstsrc_class->push = gst_disksrc_push; - /* we nominally can't (won't) do async */ - gstsrc_class->push_region = NULL; } static void gst_disksrc_init (GstDiskSrc *disksrc) { disksrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function(disksrc->srcpad,gst_disksrc_pull); gst_element_add_pad (GST_ELEMENT (disksrc), disksrc->srcpad); disksrc->filename = NULL; @@ -205,18 +202,16 @@ gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } static void -gst_disksrc_push (GstSrc *src) +gst_disksrc_pull (GstPad *pad) { - GstDiskSrc *disksrc; + GstDiskSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_DISKSRC (src)); + g_return_if_fail (pad != NULL); + src = GST_DISKSRC(gst_pad_get_parent(pad)); g_return_if_fail (GST_FLAG_IS_SET (src, GST_DISKSRC_OPEN)); g_return_if_fail (GST_STATE (src) >= GST_STATE_READY); - - disksrc = GST_DISKSRC (src); /* create the buffer */ // FIXME: should eventually use a bufferpool for this @@ -224,39 +219,39 @@ gst_disksrc_push (GstSrc *src) g_return_if_fail (buf); /* allocate the space for the buffer data */ - GST_BUFFER_DATA (buf) = g_malloc (disksrc->bytes_per_read); + GST_BUFFER_DATA (buf) = g_malloc (src->bytes_per_read); g_return_if_fail (GST_BUFFER_DATA (buf) != NULL); /* read it in from the file */ - readbytes = read (disksrc->fd, GST_BUFFER_DATA (buf), disksrc->bytes_per_read); + readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->bytes_per_read); if (readbytes == -1) { perror ("read()"); gst_buffer_unref (buf); return; } else if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (disksrc)); + gst_src_signal_eos (GST_SRC (src)); gst_buffer_unref (buf); return; } /* if we didn't get as many bytes as we asked for, we're at EOF */ - if (readbytes < disksrc->bytes_per_read) + if (readbytes < src->bytes_per_read) GST_BUFFER_FLAG_SET (buf, GST_BUFFER_EOS); /* if we have a new buffer froma seek, mark it */ - if (disksrc->new_seek) { + if (src->new_seek) { GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLUSH); - disksrc->new_seek = FALSE; + src->new_seek = FALSE; } - GST_BUFFER_OFFSET (buf) = disksrc->curoffset; + GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_SIZE (buf) = readbytes; - disksrc->curoffset += readbytes; + src->curoffset += readbytes; DEBUG("pushing with offset %d\n", GST_BUFFER_OFFSET (buf)); /* we're done, push the buffer off now */ - gst_pad_push (disksrc->srcpad, buf); + gst_pad_push (pad, buf); DEBUG("pushing with offset %d done\n", GST_BUFFER_OFFSET (buf)); } diff --git a/gst/elements/gstfakesrc.c b/gst/elements/gstfakesrc.c index 3abe707..47688a3 100644 --- a/gst/elements/gstfakesrc.c +++ b/gst/elements/gstfakesrc.c @@ -46,7 +46,7 @@ enum { static void gst_fakesrc_class_init (GstFakeSrcClass *klass); static void gst_fakesrc_init (GstFakeSrc *fakesrc); -static void gst_fakesrc_push (GstSrc *src); +static void gst_fakesrc_pull (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_fakesrc_signals[LAST_SIGNAL] = { 0 }; @@ -80,42 +80,49 @@ gst_fakesrc_class_init (GstFakeSrcClass *klass) gstsrc_class = (GstSrcClass*)klass; parent_class = gtk_type_class (GST_TYPE_SRC); - - gstsrc_class->push = gst_fakesrc_push; - gstsrc_class->push_region = NULL; } -static void -gst_fakesrc_init (GstFakeSrc *fakesrc) -{ - fakesrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (fakesrc), fakesrc->srcpad); +static void gst_fakesrc_init(GstFakeSrc *fakesrc) { + // create our output pad + fakesrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(fakesrc->srcpad,gst_fakesrc_pull); + gst_element_add_pad(GST_ELEMENT(fakesrc),fakesrc->srcpad); // we're ready right away, since we don't have any args... // gst_element_set_state(GST_ELEMENT(fakesrc),GST_STATE_READY); } /** - * gst_fakesrc_push: - * @src: the faksesrc to push + * gst_fakesrc_new: + * @name: then name of the fakse source + * + * create a new fakesrc + * + * Returns: The new element. + */ +GstElement *gst_fakesrc_new(gchar *name) { + GstElement *fakesrc = GST_ELEMENT(gtk_type_new(GST_TYPE_FAKESRC)); + gst_element_set_name(GST_ELEMENT(fakesrc),name); + return fakesrc; +} + +/** + * gst_fakesrc_pull: + * @src: the faksesrc to pull * * generate an empty buffer and push it to the next element. */ -static void -gst_fakesrc_push (GstSrc *src) -{ - GstFakeSrc *fakesrc; +void gst_fakesrc_pull(GstPad *pad) { + GstFakeSrc *src; GstBuffer *buf; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_FAKESRC (src)); - - fakesrc = GST_FAKESRC (src); + g_return_if_fail(pad != NULL); + src = GST_FAKESRC(gst_pad_get_parent(pad)); + g_return_if_fail(GST_IS_FAKESRC(src)); // g_print("gst_fakesrc_push(): pushing fake buffer from '%s'\n", -// gst_element_get_name(GST_ELEMENT(fakesrc))); +// gst_element_get_name(GST_ELEMENT(src))); g_print(">"); - buf = gst_buffer_new (); - - gst_pad_push (fakesrc->srcpad, buf); + buf = gst_buffer_new(); + gst_pad_push(pad,buf); } diff --git a/gst/elements/gstfdsrc.c b/gst/elements/gstfdsrc.c index 790eed2..5d33755 100644 --- a/gst/elements/gstfdsrc.c +++ b/gst/elements/gstfdsrc.c @@ -57,8 +57,7 @@ static void gst_fdsrc_init (GstFdSrc *fdsrc); static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_fdsrc_push (GstSrc *src); -//static void gst_fdsrc_push_region(GstSrc *src,gulong offset,gulong size); +static void gst_fdsrc_pull (GstPad *pad); static GstSrcClass *parent_class = NULL; @@ -105,18 +104,12 @@ gst_fdsrc_class_init (GstFdSrcClass *klass) gtkobject_class->set_arg = gst_fdsrc_set_arg; gtkobject_class->get_arg = gst_fdsrc_get_arg; - - gstsrc_class->push = gst_fdsrc_push; - - /* we nominally can't (won't) do async */ - gstsrc_class->push_region = NULL; } -static void -gst_fdsrc_init (GstFdSrc *fdsrc) -{ - fdsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (fdsrc), fdsrc->srcpad); +static void gst_fdsrc_init(GstFdSrc *fdsrc) { + fdsrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(fdsrc->srcpad,gst_fdsrc_pull); + gst_element_add_pad(GST_ELEMENT(fdsrc),fdsrc->srcpad); fdsrc->fd = 0; fdsrc->curoffset = 0; @@ -182,17 +175,13 @@ gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } } -static void -gst_fdsrc_push (GstSrc *src) -{ - GstFdSrc *fdsrc; +void gst_fdsrc_pull(GstPad *pad) { + GstFdSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_FDSRC (src)); - - fdsrc = GST_FDSRC (src); + g_return_if_fail(pad != NULL); + src = GST_FDSRC(gst_pad_get_parent(pad)); /* create the buffer */ // FIXME: should eventually use a bufferpool for this @@ -200,26 +189,25 @@ gst_fdsrc_push (GstSrc *src) g_return_if_fail (buf); /* allocate the space for the buffer data */ - GST_BUFFER_DATA (buf) = g_malloc (fdsrc->bytes_per_read); - g_return_if_fail (GST_BUFFER_DATA (buf) != NULL); + GST_BUFFER_DATA(buf) = g_malloc(src->bytes_per_read); + g_return_if_fail(GST_BUFFER_DATA(buf) != NULL); /* read it in from the file */ - readbytes = read (fdsrc->fd, GST_BUFFER_DATA (buf), fdsrc->bytes_per_read); + readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read); if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (fdsrc)); + gst_src_signal_eos(GST_SRC(src)); return; } /* if we didn't get as many bytes as we asked for, we're at EOF */ - if (readbytes < fdsrc->bytes_per_read) { + if (readbytes < src->bytes_per_read) { // set the buffer's EOF bit here GST_BUFFER_FLAG_SET (buf, GST_BUFFER_EOS); } - GST_BUFFER_OFFSET (buf) = fdsrc->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - - fdsrc->curoffset += readbytes; + GST_BUFFER_OFFSET(buf) = src->curoffset; + GST_BUFFER_SIZE(buf) = readbytes; + src->curoffset += readbytes; /* we're done, push the buffer off now */ - gst_pad_push (fdsrc->srcpad, buf); + gst_pad_push(pad,buf); } diff --git a/gst/elements/gsthttpsrc.c b/gst/elements/gsthttpsrc.c index c805f73..ff27e7f 100644 --- a/gst/elements/gsthttpsrc.c +++ b/gst/elements/gsthttpsrc.c @@ -48,19 +48,18 @@ enum { ARG_OFFSET }; - static void gst_httpsrc_class_init (GstHttpSrcClass *klass); static void gst_httpsrc_init (GstHttpSrc *httpsrc); static void gst_httpsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_httpsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); +static GstElementStateReturn gst_httpsrc_change_state (GstElement *element); -static void gst_httpsrc_push (GstSrc *src); +static void gst_httpsrc_pull (GstPad *pad); -static gboolean gst_httpsrc_open_url (GstHttpSrc *src); -static void gst_httpsrc_close_url (GstHttpSrc *src); +static gboolean gst_httpsrc_open_url (GstHttpSrc *src); +static void gst_httpsrc_close_url (GstHttpSrc *src); -static GstElementStateReturn gst_httpsrc_change_state(GstElement *element); static GstSrcClass *parent_class = NULL; //static guint gst_httpsrc_signals[LAST_SIGNAL] = { 0 }; @@ -109,16 +108,12 @@ gst_httpsrc_class_init (GstHttpSrcClass *klass) gtkobject_class->get_arg = gst_httpsrc_get_arg; gstelement_class->change_state = gst_httpsrc_change_state; - - gstsrc_class->push = gst_httpsrc_push; - gstsrc_class->push_region = NULL; } -static void -gst_httpsrc_init (GstHttpSrc *httpsrc) -{ - httpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (httpsrc), httpsrc->srcpad); +static void gst_httpsrc_init(GstHttpSrc *httpsrc) { + httpsrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(httpsrc->srcpad,gst_httpsrc_pull); + gst_element_add_pad(GST_ELEMENT(httpsrc),httpsrc->srcpad); httpsrc->url = NULL; httpsrc->request = NULL; @@ -127,37 +122,31 @@ gst_httpsrc_init (GstHttpSrc *httpsrc) httpsrc->bytes_per_read = 4096; } -static void -gst_httpsrc_push (GstSrc *src) -{ - GstHttpSrc *httpsrc; +static void gst_httpsrc_push(GstPad *pad) { + GstHttpSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_HTTPSRC (src)); -// g_return_if_fail(GST_FLAG_IS_SET(src,GST_)); - httpsrc = GST_HTTPSRC (src); + g_return_if_fail(pad != NULL); + src = GST_HTTPSRC(gst_pad_get_parent(pad)); + + buf = gst_buffer_new(); + GST_BUFFER_DATA(buf) = (gpointer)malloc(src->bytes_per_read); + readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read); - buf = gst_buffer_new (); - GST_BUFFER_DATA (buf) = (gpointer)malloc (httpsrc->bytes_per_read); - - readbytes = read (httpsrc->fd, GST_BUFFER_DATA (buf), httpsrc->bytes_per_read); - if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (httpsrc)); + gst_src_signal_eos(GST_SRC(src)); return; } - if (readbytes < httpsrc->bytes_per_read) { + if (readbytes < src->bytes_per_read) { // FIXME: set the buffer's EOF bit here } - GST_BUFFER_OFFSET (buf) = httpsrc->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - - httpsrc->curoffset += readbytes; + GST_BUFFER_OFFSET(buf) = src->curoffset; + GST_BUFFER_SIZE(buf) = readbytes; + src->curoffset += readbytes; - gst_pad_push (httpsrc->srcpad, buf); + gst_pad_push(pad,buf); } static gboolean diff --git a/gst/elements/gstsinesrc.c b/gst/elements/gstsinesrc.c index cf302d0..271e0b3 100644 --- a/gst/elements/gstsinesrc.c +++ b/gst/elements/gstsinesrc.c @@ -62,7 +62,7 @@ static void gst_sinesrc_get_arg(GtkObject *object,GtkArg *arg,guint id); //static gboolean gst_sinesrc_open_audio(GstSineSrc *src); void gst_sinesrc_sync_parms(GstSineSrc *sinesrc); -void gst_sinesrc_push(GstSrc *src); +void gst_sinesrc_pull(GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_sinesrc_signals[LAST_SIGNAL] = { 0 }; @@ -112,12 +112,11 @@ gst_sinesrc_class_init(GstSineSrcClass *klass) { gtkobject_class->get_arg = gst_sinesrc_get_arg; // gstelement_class->change_state = gst_sinesrc_change_state; - - gstsrc_class->push = gst_sinesrc_push; } static void gst_sinesrc_init(GstSineSrc *sinesrc) { sinesrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(sinesrc->srcpad,gst_sinesrc_pull); gst_element_add_pad(GST_ELEMENT(sinesrc),sinesrc->srcpad); sinesrc->volume = 1.0; @@ -143,17 +142,16 @@ GstElement *gst_sinesrc_new_with_fd(gchar *name,gchar *filename) { return sinesrc; } -void gst_sinesrc_push(GstSrc *src) { - GstSineSrc *sinesrc; +void gst_sinesrc_pull(GstPad *pad) { + GstSineSrc *src; GstBuffer *buf; gint16 *samples; gint i; gint volume; gdouble val; - g_return_if_fail(src != NULL); - g_return_if_fail(GST_IS_SINESRC(src)); - sinesrc = GST_SINESRC(src); + g_return_if_fail(pad != NULL); + src = GST_SINESRC(gst_pad_get_parent(pad)); buf = gst_buffer_new(); g_return_if_fail(buf); @@ -161,21 +159,21 @@ void gst_sinesrc_push(GstSrc *src) { samples = (gint16*)GST_BUFFER_DATA(buf); GST_BUFFER_SIZE(buf) = 4096; - volume = 65535 * sinesrc->volume; + volume = 65535 * src->volume; for (i=0;i<1024;i++) { - val = sin((gdouble)i/sinesrc->frequency); + val = sin((gdouble)i/src->frequency); samples[i] = val * volume; samples[i+1] = samples[i]; } - if (!sinesrc->sentmeta) { + if (!src->sentmeta) { MetaAudioRaw *newmeta = g_new(MetaAudioRaw,1); - memcpy(newmeta,&sinesrc->meta,sizeof(MetaAudioRaw)); + memcpy(newmeta,&src->meta,sizeof(MetaAudioRaw)); gst_buffer_add_meta(buf,GST_META(newmeta)); - sinesrc->sentmeta = TRUE; + src->sentmeta = TRUE; } - gst_pad_push(sinesrc->srcpad,buf); + gst_pad_push(pad,buf); g_print(">"); } diff --git a/gst/gstbin.c b/gst/gstbin.c index 4f7c670..da9b10b 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -17,7 +17,6 @@ * Boston, MA 02111-1307, USA. */ -//#define DEBUG_ENABLED #include #include "config.h" @@ -123,7 +122,8 @@ gst_bin_init (GstBin *bin) { bin->numchildren = 0; bin->children = NULL; - bin->use_cothreads = TRUE; +// FIXME temporary testing measure +// bin->use_cothreads = TRUE; } /** @@ -213,7 +213,7 @@ gst_bin_change_state (GstElement *element) GstElement *child; g_return_val_if_fail (GST_IS_BIN (element), GST_STATE_FAILURE); - + bin = GST_BIN (element); g_print("gst_bin_change_state(\"%s\"): currently %d(%s), %d(%s) pending\n", @@ -479,12 +479,14 @@ gst_bin_iterate (GstBin *bin) { GstBinClass *oclass; - oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass); + DEBUG_ENTER("('%s')",gst_element_get_name(GST_ELEMENT(bin))); - DEBUG("gst_bin_iterate()\n"); + oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass); if (oclass->iterate) (oclass->iterate) (bin); + + DEBUG_LEAVE("('%s')",gst_element_get_name(GST_ELEMENT(bin))); } /** @@ -518,90 +520,104 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[]) GstBuffer *buf; G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); - DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n", - argc,gst_element_get_name (element)); + DEBUG_ENTER("(%d,'%s')",argc,name); + +// DEBUG("entering gst_bin_loopfunc_wrapper(%d,\"%s\")\n", +// argc,gst_element_get_name (element)); if (element->loopfunc != NULL) { - while (1) { - DEBUG("** gst_bin_loopfunc_wrapper(): element %s has loop function, calling it\n", name); - (element->loopfunc) (element); - DEBUG("** gst_bin_loopfunc_wrapper(): element %s ended loop function\n", name); - } + DEBUG("element %s has loop function, calling it\n", name); + (element->loopfunc) (element); + DEBUG("element %s ended loop function\n", name); } else { - DEBUG("** gst_bin_loopfunc_wrapper(): element %s is chain-based, calling in infinite loop\n", name); - if (GST_IS_SRC (element)) { - region_struct *region = cothread_get_data (element->threadstate, "region"); - if (region) { - gst_src_push_region (GST_SRC (element), region->offset, region->size); - } - else { - DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s\n", name); - gst_src_push (GST_SRC (element)); - DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s done\n", name); - } - } else if (GST_IS_CONNECTION (element) && argc == 1) { + DEBUG("element %s is chain-based\n", name); + if (GST_IS_CONNECTION (element) && argc == 1) { while (1) { - DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s\n", name); + DEBUG("calling push function of connection %s\n", name); gst_connection_push (GST_CONNECTION (element)); - DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s done\n", name); + DEBUG("calling push function of connection %s done\n", name); } } else { - while (1) { + DEBUG("stepping through pads\n"); + do { pads = element->pads; while (pads) { pad = GST_PAD (pads->data); if (pad->direction == GST_PAD_SINK) { - DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); + DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); buf = gst_pad_pull (pad); - DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); + DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); (pad->chainfunc) (pad,buf); - DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); + DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); } pads = g_list_next (pads); } - } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); } } + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG_LEAVE("(%d,'%s')",argc,name); + return 0; +} + +static int +gst_bin_pullsrc_wrapper (int argc,char *argv[]) +{ + GstElement *element = GST_ELEMENT (argv); + GList *pads; + GstPad *pad; + GstBuffer *buf; + G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); + + DEBUG("entering gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name); + + do { + pads = element->pads; + while (pads) { + pad = GST_PAD (pads->data); + if (pad->direction == GST_PAD_SRC) { + if (pad->pullfunc == NULL) fprintf(stderr,"error, no pullfunc\n"); + (pad->pullfunc)(pad); + } + pads = g_list_next(pads); + } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG("leaving gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name); return 0; } static void -gst_bin_pullfunc_wrapper (GstPad *pad) +gst_bin_pullfunc_proxy (GstPad *pad) { - DEBUG("** in gst_bin_pullfunc_wrapper()============================= %s\n", - gst_element_get_name (GST_ELEMENT (pad->parent))); + DEBUG_ENTER("(%s)",gst_element_get_name (GST_ELEMENT (pad->parent))); cothread_switch (GST_ELEMENT (pad->parent)->threadstate); - DEBUG("** out gst_bin_pullfunc_wrapper()============================= %s\n", - gst_element_get_name (GST_ELEMENT (pad->parent))); } static void -gst_bin_pullregionfunc_wrapper (GstPad *pad, +gst_bin_pullregionfunc_proxy (GstPad *pad, gulong offset, gulong size) { region_struct region; + DEBUG_ENTER("%s",gst_element_get_name (GST_ELEMENT (pad->parent))); + region.offset = offset; region.size = size; - DEBUG("** in gst_bin_pullregionfunc_wrapper()============================= %s\n", - gst_element_get_name (GST_ELEMENT (pad->parent))); cothread_set_data (GST_ELEMENT (pad->parent)->threadstate, "region", ®ion); cothread_switch (GST_ELEMENT (pad->parent)->threadstate); cothread_set_data (GST_ELEMENT (pad->parent)->threadstate, "region", NULL); - DEBUG("** out gst_bin_pullregionfunc_wrapper()============================= %s\n", - gst_element_get_name (GST_ELEMENT (pad->parent))); } static void -gst_bin_pushfunc_wrapper (GstPad *pad) +gst_bin_pushfunc_proxy (GstPad *pad) { - DEBUG("** in gst_bin_pushfunc_wrapper()============================= %s\n", - gst_element_get_name (GST_ELEMENT (pad->parent))); + DEBUG_ENTER("%s",gst_element_get_name (GST_ELEMENT (pad->parent))); cothread_switch (GST_ELEMENT (pad->parent)->threadstate); - DEBUG("** out gst_bin_pushfunc_wrapper()============================= %s\n", - gst_element_get_name (GST_ELEMENT (pad->parent))); } static void @@ -665,7 +681,9 @@ gst_bin_create_plan_func (GstBin *bin) } // FIXME - bin->need_cothreads &= bin->use_cothreads; +// bin->need_cothreads &= bin->use_cothreads; + // FIXME temporary testing measure + if (bin->use_cothreads) bin->need_cothreads = TRUE; // clear previous plan state g_list_free (bin->entries); @@ -693,14 +711,15 @@ gst_bin_create_plan_func (GstBin *bin) 0, (char **)element); } if (GST_IS_BIN (element)) { - gst_bin_create_plan (GST_BIN (element)); + gst_bin_create_plan (GST_BIN (element)); } - + if (GST_IS_SRC (element)) { g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name (element)); bin->entries = g_list_prepend (bin->entries,element); bin->numentries++; - } + cothread_setfunc(element->threadstate,gst_bin_pullsrc_wrapper,0,(char **)element); + } pads = gst_element_get_pad_list (element); while (pads) { @@ -710,10 +729,12 @@ gst_bin_create_plan_func (GstBin *bin) // an internal connection will push outside this bin. if (!GST_IS_CONNECTION (element)) { - pad->pushfunc = gst_bin_pushfunc_wrapper; + pad->pushfunc = gst_bin_pushfunc_proxy; } - pad->pullfunc = gst_bin_pullfunc_wrapper; - pad->pullregionfunc = gst_bin_pullregionfunc_wrapper; + if (!pad->pullfunc) + pad->pullfunc = gst_bin_pullfunc_proxy; + if (!pad->pullfunc) + pad->pullregionfunc = gst_bin_pullregionfunc_proxy; /* we only worry about sink pads */ if (gst_pad_get_direction (pad) == GST_PAD_SINK) { @@ -733,9 +754,9 @@ gst_bin_create_plan_func (GstBin *bin) g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n", gst_element_get_name (outside),gst_pad_get_name (opad), opad, opad->pullfunc); - opad->pushfunc = gst_bin_pushfunc_wrapper; - opad->pullfunc = gst_bin_pullfunc_wrapper; - opad->pullregionfunc = gst_bin_pullregionfunc_wrapper; + opad->pushfunc = gst_bin_pushfunc_proxy; + opad->pullfunc = gst_bin_pullfunc_proxy; + opad->pullregionfunc = gst_bin_pullregionfunc_proxy; if (outside->threadstate == NULL) { outside->threadstate = cothread_create (bin->threadcontext); @@ -815,26 +836,22 @@ gst_bin_iterate_func (GstBin *bin) { GList *entries; GstElement *entry; + GList *pads; + GstPad *pad; - DEBUG("gst_bin_iterate_func() in \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin))); + DEBUG_ENTER("(%s)",gst_element_get_name(GST_ELEMENT(bin))); g_return_if_fail (bin != NULL); g_return_if_fail (GST_IS_BIN (bin)); g_return_if_fail (GST_STATE (bin) == GST_STATE_PLAYING); - DEBUG("GstBin: iterating\n"); - if (bin->need_cothreads) { // all we really have to do is switch to the first child // FIXME this should be lots more intelligent about where to start - DEBUG("** in gst_bin_iterate_func()==================================%s\n", - gst_element_get_name (GST_ELEMENT (bin->children->data))); + GST_FLAG_SET(GST_ELEMENT (bin->children->data),GST_ELEMENT_COTHREAD_STOPPING); cothread_switch (GST_ELEMENT (bin->children->data)->threadstate); - - DEBUG("** out gst_bin_iterate_func()==================================%s\n", - gst_element_get_name (GST_ELEMENT (bin->children->data))); - + } else { if (bin->numentries <= 0) { //printf("gstbin: no entries in bin \"%s\" trying children...\n", gst_element_get_name(GST_ELEMENT(bin))); @@ -849,9 +866,21 @@ gst_bin_iterate_func (GstBin *bin) while (entries) { entry = GST_ELEMENT (entries->data); - if (GST_IS_SRC (entry)) - gst_src_push (GST_SRC (entry)); - else if (GST_IS_CONNECTION (entry)) + if (GST_IS_SRC (entry)) { +// if (GST_SRC_PUSH_FUNCTION(entry)) +// gst_src_push (GST_SRC (entry)); +// else { + pads = entry->pads; + while (pads) { + pad = GST_PAD(pads->data); + if (pad->direction == GST_PAD_SRC) { + if (pad->pullfunc == NULL) fprintf(stderr,"error, no pullfunc\n"); + (pad->pullfunc)(pad); + } + pads = g_list_next(pads); + } +// } + } else if (GST_IS_CONNECTION (entry)) gst_connection_push (GST_CONNECTION (entry)); else if (GST_IS_BIN (entry)) gst_bin_iterate (GST_BIN (entry)); @@ -860,4 +889,6 @@ gst_bin_iterate_func (GstBin *bin) entries = g_list_next (entries); } } + + DEBUG_LEAVE("(%s)",gst_element_get_name(GST_ELEMENT(bin))); } diff --git a/gst/gstelement.h b/gst/gstelement.h index 23212ab..ff4e5d0 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -81,10 +81,12 @@ typedef enum { GST_ELEMENT_NO_SEEK = (1 << 6), GST_ELEMENT_NEW_LOOPFUNC = (1 << 16), + GST_ELEMENT_COTHREAD_STOPPING = (1 << 17), } GstElementFlags; -#define GST_ELEMENT_IS_MULTI_IN(obj) (GST_FLAGS(obj) & GST_ELEMENT_MULTI_IN) -#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAGS(obj) & GST_ELEMENT_THREAD_SUGGESTED) +#define GST_ELEMENT_IS_MULTI_IN(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_MULTI_IN)) +#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED)) +#define GST_ELEMENT_IS_COTHREAD_STOPPING(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING)) typedef struct _GstElement GstElement; diff --git a/gst/gstpad.c b/gst/gstpad.c index 81cde7d..830a32e 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -289,6 +289,10 @@ void gst_pad_push (GstPad *pad, GstBuffer *buffer) { + GstPad *peer; + + DEBUG_ENTER("(pad:'%s',buffer:%p)",gst_pad_get_name(pad),buffer); + g_return_if_fail(pad != NULL); g_return_if_fail(GST_IS_PAD(pad)); g_return_if_fail(GST_PAD_CONNECTED(pad)); @@ -303,21 +307,23 @@ gst_pad_push (GstPad *pad, gst_trace_add_entry(NULL,0,buffer,"push buffer"); + peer = pad->peer; + g_return_if_fail(peer != NULL); + // first check to see if there's a push handler if (pad->pushfunc != NULL) { // put the buffer in peer's holding pen - pad->peer->bufpen = buffer; + peer->bufpen = buffer; // now inform the handler that the peer pad has something - (pad->pushfunc)(pad->peer); + (pad->pushfunc)(peer); // otherwise we assume we're chaining directly - } else if (pad->chainfunc != NULL) { + } else if (peer->chainfunc != NULL) { //g_print("-- gst_pad_push(): calling chain handler\n"); - (pad->chainfunc)(pad->peer,buffer); + (peer->chainfunc)(peer,buffer); // else we squawk } else { g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n"); } - } /** diff --git a/gst/gstpad.h b/gst/gstpad.h index ad0b97a..13bd9a4 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -39,8 +39,8 @@ extern "C" { #define GST_IS_PAD_CLASS(obj) (GTK_CHECK_CLASS_TYPE ((klass), GST_TYPE_PAD)) // quick test to see if the pad is connected -#define GST_PAD_CONNECTED(pad) (pad && (pad)->peer != NULL) -#define GST_PAD_CAN_PULL(pad) (pad && (pad)->pullfunc != NULL) +#define GST_PAD_CONNECTED(pad) ((pad) && (pad)->peer != NULL) +#define GST_PAD_CAN_PULL(pad) ((pad) && (pad)->pullfunc != NULL) typedef struct _GstPad GstPad; typedef struct _GstPadClass GstPadClass; @@ -77,9 +77,9 @@ struct _GstPad { GstBuffer *bufpen; GstPadChainFunction chainfunc; + GstPadPushFunction pushfunc; GstPadPullFunction pullfunc; GstPadPullRegionFunction pullregionfunc; - GstPadPushFunction pushfunc; GstPadQoSFunction qosfunc; GstObject *parent; @@ -102,6 +102,7 @@ GstPadDirection gst_pad_get_direction (GstPad *pad); void gst_pad_set_chain_function (GstPad *pad, GstPadChainFunction chain); void gst_pad_set_pull_function (GstPad *pad, GstPadPullFunction pull); +void gst_pad_set_pullregion_function (GstPad *pad, GstPadPullRegionFunction pullregion); void gst_pad_set_qos_function (GstPad *pad, GstPadQoSFunction qos); // FIXME is here for backward compatibility until we have GstCaps working... diff --git a/gst/gstpipeline.c b/gst/gstpipeline.c index 610da96..827f16b 100644 --- a/gst/gstpipeline.c +++ b/gst/gstpipeline.c @@ -161,9 +161,9 @@ gst_pipeline_typefind (GstPipeline *pipeline, GstElement *element) gst_element_set_state (GST_ELEMENT (element), GST_STATE_PLAYING); // keep pushing buffers... the have_type signal handler will set the found flag - while (!found) { - gst_src_push (GST_SRC (element)); - } +// while (!found) { +// gst_src_push(GST_SRC(element)); +// } gst_element_set_state (GST_ELEMENT (element), GST_STATE_NULL); diff --git a/gst/gstsrc.c b/gst/gstsrc.c index 53fac37..53bc1e5 100644 --- a/gst/gstsrc.c +++ b/gst/gstsrc.c @@ -97,48 +97,3 @@ gst_src_signal_eos (GstSrc *src) gtk_signal_emit (GTK_OBJECT (src), gst_src_signals[EOS]); } - -/** - * gst_src_push: - * @src: source to trigger the push of - * - * Push a buffer from the source. - */ -void -gst_src_push (GstSrc *src) -{ - GstSrcClass *oclass; - - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_SRC (src)); - - oclass = (GstSrcClass *)(GTK_OBJECT (src)->klass); - - g_return_if_fail (oclass->push != NULL); - - (oclass->push)(src); -} - -/** - * gst_src_push_region: - * @src: source to trigger the push of - * @offset: offset in source - * @size: number of bytes to push - * - * Push a buffer of a given size from the source. - */ -void -gst_src_push_region (GstSrc *src, gulong offset, gulong size) -{ - GstSrcClass *oclass; - - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_SRC (src)); - - oclass = (GstSrcClass *)(GTK_OBJECT (src)->klass); - - g_return_if_fail (oclass->push_region != NULL); - - (oclass->push_region)(src, offset, size); -} - diff --git a/gst/gstsrc.h b/gst/gstsrc.h index 190d8fa..4e115e8 100644 --- a/gst/gstsrc.h +++ b/gst/gstsrc.h @@ -54,17 +54,13 @@ typedef struct _GstSrc GstSrc; typedef struct _GstSrcClass GstSrcClass; struct _GstSrc { - GstElement element; - gint32 flags; + GstElement element; + gint32 flags; }; struct _GstSrcClass { GstElementClass parent_class; - /* subclass functions */ - void (*push) (GstSrc *src); - void (*push_region) (GstSrc *src, gulong offset, gulong size); - /* signals */ void (*eos) (GstSrc *src); }; @@ -74,12 +70,9 @@ struct _GstSrcClass { #define GST_SRC_UNSET_FLAGS(src,flag) \ G_STMT_START{ (GST_SRC_FLAGS (src) &= ~(flag)); }G_STMT_END -GtkType gst_src_get_type (void); - -void gst_src_push (GstSrc *src); -void gst_src_push_region (GstSrc *src, gulong offset, gulong size); +GtkType gst_src_get_type (void); -void gst_src_signal_eos (GstSrc *src); +void gst_src_signal_eos (GstSrc *src); #ifdef __cplusplus } diff --git a/plugins/elements/gstasyncdisksrc.c b/plugins/elements/gstasyncdisksrc.c index 57cc0dc..6de1348 100644 --- a/plugins/elements/gstasyncdisksrc.c +++ b/plugins/elements/gstasyncdisksrc.c @@ -57,8 +57,8 @@ static void gst_asyncdisksrc_init (GstAsyncDiskSrc *asyncdisksrc); static void gst_asyncdisksrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_asyncdisksrc_push (GstSrc *src); -static void gst_asyncdisksrc_push_region (GstSrc *src, gulong offset, gulong size); +static void gst_asyncdisksrc_pull (GstPad *pad); +static void gst_asyncdisksrc_pull_region (GstPad *pad, gulong offset, gulong size); static GstElementStateReturn gst_asyncdisksrc_change_state (GstElement *element); @@ -113,9 +113,6 @@ gst_asyncdisksrc_class_init (GstAsyncDiskSrcClass *klass) gtkobject_class->get_arg = gst_asyncdisksrc_get_arg; gstelement_class->change_state = gst_asyncdisksrc_change_state; - - gstsrc_class->push = gst_asyncdisksrc_push; - gstsrc_class->push_region = gst_asyncdisksrc_push_region; } static void @@ -124,6 +121,8 @@ gst_asyncdisksrc_init (GstAsyncDiskSrc *asyncdisksrc) GST_SRC_SET_FLAGS (asyncdisksrc, GST_SRC_ASYNC); asyncdisksrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function (asyncdisksrc->srcpad,gst_asyncdisksrc_pull); + // FIXME must set pullregion gst_element_add_pad (GST_ELEMENT (asyncdisksrc), asyncdisksrc->srcpad); asyncdisksrc->filename = NULL; @@ -204,26 +203,24 @@ gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } /** - * gst_asyncdisksrc_push: - * @src: #GstSrc to push a buffer from + * gst_asyncdisksrc_pull: + * @pad: #GstPad to push a buffer from * * Push a new buffer from the asyncdisksrc at the current offset. */ static void -gst_asyncdisksrc_push (GstSrc *src) +gst_asyncdisksrc_pull (GstPad *pad) { - GstAsyncDiskSrc *asyncdisksrc; + GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_ASYNCDISKSRC (src)); + g_return_if_fail (pad != NULL); + src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); - asyncdisksrc = GST_ASYNCDISKSRC (src); - /* deal with EOF state */ - if (asyncdisksrc->curoffset >= asyncdisksrc->size) { - gst_src_signal_eos (GST_SRC (asyncdisksrc)); + if (src->curoffset >= src->size) { + gst_src_signal_eos (GST_SRC (src)); return; } @@ -234,30 +231,30 @@ gst_asyncdisksrc_push (GstSrc *src) g_return_if_fail (buf != NULL); /* simply set the buffer to point to the correct region of the file */ - GST_BUFFER_DATA (buf) = asyncdisksrc->map + asyncdisksrc->curoffset; - GST_BUFFER_OFFSET (buf) = asyncdisksrc->curoffset; + GST_BUFFER_DATA (buf) = src->map + src->curoffset; + GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE); - if ((asyncdisksrc->curoffset + asyncdisksrc->bytes_per_read) > - asyncdisksrc->size) { - GST_BUFFER_SIZE (buf) = asyncdisksrc->size - asyncdisksrc->curoffset; + if ((src->curoffset + src->bytes_per_read) > + src->size) { + GST_BUFFER_SIZE (buf) = src->size - src->curoffset; // FIXME: set the buffer's EOF bit here } else - GST_BUFFER_SIZE (buf) = asyncdisksrc->bytes_per_read; + GST_BUFFER_SIZE (buf) = src->bytes_per_read; - asyncdisksrc->curoffset += GST_BUFFER_SIZE (buf); + src->curoffset += GST_BUFFER_SIZE (buf); - if (asyncdisksrc->new_seek) { + if (src->new_seek) { GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLUSH); - asyncdisksrc->new_seek = FALSE; + src->new_seek = FALSE; } /* we're done, push the buffer off now */ - gst_pad_push (asyncdisksrc->srcpad, buf); + gst_pad_push (pad, buf); } /** - * gst_asyncdisksrc_push_region: + * gst_asyncdisksrc_pull_region: * @src: #GstSrc to push a buffer from * @offset: offset in file * @size: number of bytes @@ -265,20 +262,21 @@ gst_asyncdisksrc_push (GstSrc *src) * Push a new buffer from the asyncdisksrc of given size at given offset. */ static void -gst_asyncdisksrc_push_region (GstSrc *src, gulong offset, gulong size) +gst_asyncdisksrc_pull_region (GstPad *pad, gulong offset, gulong size) { - GstAsyncDiskSrc *asyncdisksrc; + GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (src != NULL); + g_return_if_fail (pad != NULL); + + src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); + g_return_if_fail (GST_IS_ASYNCDISKSRC (src)); g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); - asyncdisksrc = GST_ASYNCDISKSRC (src); - /* deal with EOF state */ - if (offset >= asyncdisksrc->size) { - gst_src_signal_eos (GST_SRC (asyncdisksrc)); + if (offset >= src->size) { + gst_src_signal_eos (GST_SRC (src)); return; } @@ -288,18 +286,18 @@ gst_asyncdisksrc_push_region (GstSrc *src, gulong offset, gulong size) g_return_if_fail (buf); /* simply set the buffer to point to the correct region of the file */ - GST_BUFFER_DATA (buf) = asyncdisksrc->map + offset; + GST_BUFFER_DATA (buf) = src->map + offset; GST_BUFFER_OFFSET (buf) = offset; GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE); - if ((offset + size) > asyncdisksrc->size) { - GST_BUFFER_SIZE (buf) = asyncdisksrc->size - offset; + if ((offset + size) > src->size) { + GST_BUFFER_SIZE (buf) = src->size - offset; // FIXME: set the buffer's EOF bit here } else GST_BUFFER_SIZE (buf) = size; /* we're done, push the buffer off now */ - gst_pad_push (asyncdisksrc->srcpad,buf); + gst_pad_push (pad,buf); } diff --git a/plugins/elements/gstaudiosrc.c b/plugins/elements/gstaudiosrc.c index ee50978..6e89a77 100644 --- a/plugins/elements/gstaudiosrc.c +++ b/plugins/elements/gstaudiosrc.c @@ -64,7 +64,7 @@ static void gst_audiosrc_close_audio (GstAudioSrc *src); static gboolean gst_audiosrc_open_audio (GstAudioSrc *src); static void gst_audiosrc_sync_parms (GstAudioSrc *audiosrc); -static void gst_audiosrc_push (GstSrc *src); +static void gst_audiosrc_pull (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_audiosrc_signals[LAST_SIGNAL] = { 0 }; @@ -118,14 +118,13 @@ gst_audiosrc_class_init (GstAudioSrcClass *klass) gtkobject_class->get_arg = gst_audiosrc_get_arg; gstelement_class->change_state = gst_audiosrc_change_state; - - gstsrc_class->push = gst_audiosrc_push; } static void gst_audiosrc_init (GstAudioSrc *audiosrc) { audiosrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function(audiosrc->srcpad,gst_audiosrc_pull); gst_element_add_pad (GST_ELEMENT (audiosrc), audiosrc->srcpad); audiosrc->fd = -1; @@ -141,40 +140,37 @@ gst_audiosrc_init (GstAudioSrc *audiosrc) audiosrc->seq = 0; } -static void -gst_audiosrc_push (GstSrc *src) -{ - GstAudioSrc *audiosrc; +void gst_audiosrc_pull(GstPad *pad) { + GstAudioSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_AUDIOSRC (src)); - audiosrc = GST_AUDIOSRC (src); + g_return_if_fail(pad != NULL); + src = GST_AUDIOSRC(gst_pad_get_parent(pad)); // g_print("attempting to read something from soundcard\n"); buf = gst_buffer_new (); g_return_if_fail (buf); - GST_BUFFER_DATA (buf) = (gpointer)g_malloc (audiosrc->bytes_per_read); - - readbytes = read (audiosrc->fd,GST_BUFFER_DATA (buf), - audiosrc->bytes_per_read); + GST_BUFFER_DATA (buf) = (gpointer)g_malloc (src->bytes_per_read); + readbytes = read (src->fd,GST_BUFFER_DATA (buf), + src->bytes_per_read); + if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (audiosrc)); + gst_src_signal_eos (GST_SRC (src)); return; } GST_BUFFER_SIZE (buf) = readbytes; - GST_BUFFER_OFFSET (buf) = audiosrc->curoffset; + GST_BUFFER_OFFSET (buf) = src->curoffset; - audiosrc->curoffset += readbytes; + src->curoffset += readbytes; // gst_buffer_add_meta(buf,GST_META(newmeta)); - gst_pad_push (audiosrc->srcpad,buf); + gst_pad_push (pad,buf); // g_print("pushed buffer from soundcard of %d bytes\n",readbytes); } diff --git a/plugins/elements/gstdisksrc.c b/plugins/elements/gstdisksrc.c index 171ec35..e985df3 100644 --- a/plugins/elements/gstdisksrc.c +++ b/plugins/elements/gstdisksrc.c @@ -60,7 +60,7 @@ static void gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_disksrc_close_file (GstDiskSrc *src); -static void gst_disksrc_push (GstSrc *src); +static void gst_disksrc_pull (GstPad *pad); static GstElementStateReturn gst_disksrc_change_state (GstElement *element); @@ -114,16 +114,13 @@ gst_disksrc_class_init (GstDiskSrcClass *klass) gtkobject_class->get_arg = gst_disksrc_get_arg; gstelement_class->change_state = gst_disksrc_change_state; - - gstsrc_class->push = gst_disksrc_push; - /* we nominally can't (won't) do async */ - gstsrc_class->push_region = NULL; } static void gst_disksrc_init (GstDiskSrc *disksrc) { disksrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function(disksrc->srcpad,gst_disksrc_pull); gst_element_add_pad (GST_ELEMENT (disksrc), disksrc->srcpad); disksrc->filename = NULL; @@ -205,18 +202,16 @@ gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } static void -gst_disksrc_push (GstSrc *src) +gst_disksrc_pull (GstPad *pad) { - GstDiskSrc *disksrc; + GstDiskSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_DISKSRC (src)); + g_return_if_fail (pad != NULL); + src = GST_DISKSRC(gst_pad_get_parent(pad)); g_return_if_fail (GST_FLAG_IS_SET (src, GST_DISKSRC_OPEN)); g_return_if_fail (GST_STATE (src) >= GST_STATE_READY); - - disksrc = GST_DISKSRC (src); /* create the buffer */ // FIXME: should eventually use a bufferpool for this @@ -224,39 +219,39 @@ gst_disksrc_push (GstSrc *src) g_return_if_fail (buf); /* allocate the space for the buffer data */ - GST_BUFFER_DATA (buf) = g_malloc (disksrc->bytes_per_read); + GST_BUFFER_DATA (buf) = g_malloc (src->bytes_per_read); g_return_if_fail (GST_BUFFER_DATA (buf) != NULL); /* read it in from the file */ - readbytes = read (disksrc->fd, GST_BUFFER_DATA (buf), disksrc->bytes_per_read); + readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->bytes_per_read); if (readbytes == -1) { perror ("read()"); gst_buffer_unref (buf); return; } else if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (disksrc)); + gst_src_signal_eos (GST_SRC (src)); gst_buffer_unref (buf); return; } /* if we didn't get as many bytes as we asked for, we're at EOF */ - if (readbytes < disksrc->bytes_per_read) + if (readbytes < src->bytes_per_read) GST_BUFFER_FLAG_SET (buf, GST_BUFFER_EOS); /* if we have a new buffer froma seek, mark it */ - if (disksrc->new_seek) { + if (src->new_seek) { GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLUSH); - disksrc->new_seek = FALSE; + src->new_seek = FALSE; } - GST_BUFFER_OFFSET (buf) = disksrc->curoffset; + GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_SIZE (buf) = readbytes; - disksrc->curoffset += readbytes; + src->curoffset += readbytes; DEBUG("pushing with offset %d\n", GST_BUFFER_OFFSET (buf)); /* we're done, push the buffer off now */ - gst_pad_push (disksrc->srcpad, buf); + gst_pad_push (pad, buf); DEBUG("pushing with offset %d done\n", GST_BUFFER_OFFSET (buf)); } diff --git a/plugins/elements/gstfakesrc.c b/plugins/elements/gstfakesrc.c index 3abe707..47688a3 100644 --- a/plugins/elements/gstfakesrc.c +++ b/plugins/elements/gstfakesrc.c @@ -46,7 +46,7 @@ enum { static void gst_fakesrc_class_init (GstFakeSrcClass *klass); static void gst_fakesrc_init (GstFakeSrc *fakesrc); -static void gst_fakesrc_push (GstSrc *src); +static void gst_fakesrc_pull (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_fakesrc_signals[LAST_SIGNAL] = { 0 }; @@ -80,42 +80,49 @@ gst_fakesrc_class_init (GstFakeSrcClass *klass) gstsrc_class = (GstSrcClass*)klass; parent_class = gtk_type_class (GST_TYPE_SRC); - - gstsrc_class->push = gst_fakesrc_push; - gstsrc_class->push_region = NULL; } -static void -gst_fakesrc_init (GstFakeSrc *fakesrc) -{ - fakesrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (fakesrc), fakesrc->srcpad); +static void gst_fakesrc_init(GstFakeSrc *fakesrc) { + // create our output pad + fakesrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(fakesrc->srcpad,gst_fakesrc_pull); + gst_element_add_pad(GST_ELEMENT(fakesrc),fakesrc->srcpad); // we're ready right away, since we don't have any args... // gst_element_set_state(GST_ELEMENT(fakesrc),GST_STATE_READY); } /** - * gst_fakesrc_push: - * @src: the faksesrc to push + * gst_fakesrc_new: + * @name: then name of the fakse source + * + * create a new fakesrc + * + * Returns: The new element. + */ +GstElement *gst_fakesrc_new(gchar *name) { + GstElement *fakesrc = GST_ELEMENT(gtk_type_new(GST_TYPE_FAKESRC)); + gst_element_set_name(GST_ELEMENT(fakesrc),name); + return fakesrc; +} + +/** + * gst_fakesrc_pull: + * @src: the faksesrc to pull * * generate an empty buffer and push it to the next element. */ -static void -gst_fakesrc_push (GstSrc *src) -{ - GstFakeSrc *fakesrc; +void gst_fakesrc_pull(GstPad *pad) { + GstFakeSrc *src; GstBuffer *buf; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_FAKESRC (src)); - - fakesrc = GST_FAKESRC (src); + g_return_if_fail(pad != NULL); + src = GST_FAKESRC(gst_pad_get_parent(pad)); + g_return_if_fail(GST_IS_FAKESRC(src)); // g_print("gst_fakesrc_push(): pushing fake buffer from '%s'\n", -// gst_element_get_name(GST_ELEMENT(fakesrc))); +// gst_element_get_name(GST_ELEMENT(src))); g_print(">"); - buf = gst_buffer_new (); - - gst_pad_push (fakesrc->srcpad, buf); + buf = gst_buffer_new(); + gst_pad_push(pad,buf); } diff --git a/plugins/elements/gstfdsrc.c b/plugins/elements/gstfdsrc.c index 790eed2..5d33755 100644 --- a/plugins/elements/gstfdsrc.c +++ b/plugins/elements/gstfdsrc.c @@ -57,8 +57,7 @@ static void gst_fdsrc_init (GstFdSrc *fdsrc); static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_fdsrc_push (GstSrc *src); -//static void gst_fdsrc_push_region(GstSrc *src,gulong offset,gulong size); +static void gst_fdsrc_pull (GstPad *pad); static GstSrcClass *parent_class = NULL; @@ -105,18 +104,12 @@ gst_fdsrc_class_init (GstFdSrcClass *klass) gtkobject_class->set_arg = gst_fdsrc_set_arg; gtkobject_class->get_arg = gst_fdsrc_get_arg; - - gstsrc_class->push = gst_fdsrc_push; - - /* we nominally can't (won't) do async */ - gstsrc_class->push_region = NULL; } -static void -gst_fdsrc_init (GstFdSrc *fdsrc) -{ - fdsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (fdsrc), fdsrc->srcpad); +static void gst_fdsrc_init(GstFdSrc *fdsrc) { + fdsrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(fdsrc->srcpad,gst_fdsrc_pull); + gst_element_add_pad(GST_ELEMENT(fdsrc),fdsrc->srcpad); fdsrc->fd = 0; fdsrc->curoffset = 0; @@ -182,17 +175,13 @@ gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } } -static void -gst_fdsrc_push (GstSrc *src) -{ - GstFdSrc *fdsrc; +void gst_fdsrc_pull(GstPad *pad) { + GstFdSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_FDSRC (src)); - - fdsrc = GST_FDSRC (src); + g_return_if_fail(pad != NULL); + src = GST_FDSRC(gst_pad_get_parent(pad)); /* create the buffer */ // FIXME: should eventually use a bufferpool for this @@ -200,26 +189,25 @@ gst_fdsrc_push (GstSrc *src) g_return_if_fail (buf); /* allocate the space for the buffer data */ - GST_BUFFER_DATA (buf) = g_malloc (fdsrc->bytes_per_read); - g_return_if_fail (GST_BUFFER_DATA (buf) != NULL); + GST_BUFFER_DATA(buf) = g_malloc(src->bytes_per_read); + g_return_if_fail(GST_BUFFER_DATA(buf) != NULL); /* read it in from the file */ - readbytes = read (fdsrc->fd, GST_BUFFER_DATA (buf), fdsrc->bytes_per_read); + readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read); if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (fdsrc)); + gst_src_signal_eos(GST_SRC(src)); return; } /* if we didn't get as many bytes as we asked for, we're at EOF */ - if (readbytes < fdsrc->bytes_per_read) { + if (readbytes < src->bytes_per_read) { // set the buffer's EOF bit here GST_BUFFER_FLAG_SET (buf, GST_BUFFER_EOS); } - GST_BUFFER_OFFSET (buf) = fdsrc->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - - fdsrc->curoffset += readbytes; + GST_BUFFER_OFFSET(buf) = src->curoffset; + GST_BUFFER_SIZE(buf) = readbytes; + src->curoffset += readbytes; /* we're done, push the buffer off now */ - gst_pad_push (fdsrc->srcpad, buf); + gst_pad_push(pad,buf); } diff --git a/plugins/elements/gsthttpsrc.c b/plugins/elements/gsthttpsrc.c index c805f73..ff27e7f 100644 --- a/plugins/elements/gsthttpsrc.c +++ b/plugins/elements/gsthttpsrc.c @@ -48,19 +48,18 @@ enum { ARG_OFFSET }; - static void gst_httpsrc_class_init (GstHttpSrcClass *klass); static void gst_httpsrc_init (GstHttpSrc *httpsrc); static void gst_httpsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_httpsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); +static GstElementStateReturn gst_httpsrc_change_state (GstElement *element); -static void gst_httpsrc_push (GstSrc *src); +static void gst_httpsrc_pull (GstPad *pad); -static gboolean gst_httpsrc_open_url (GstHttpSrc *src); -static void gst_httpsrc_close_url (GstHttpSrc *src); +static gboolean gst_httpsrc_open_url (GstHttpSrc *src); +static void gst_httpsrc_close_url (GstHttpSrc *src); -static GstElementStateReturn gst_httpsrc_change_state(GstElement *element); static GstSrcClass *parent_class = NULL; //static guint gst_httpsrc_signals[LAST_SIGNAL] = { 0 }; @@ -109,16 +108,12 @@ gst_httpsrc_class_init (GstHttpSrcClass *klass) gtkobject_class->get_arg = gst_httpsrc_get_arg; gstelement_class->change_state = gst_httpsrc_change_state; - - gstsrc_class->push = gst_httpsrc_push; - gstsrc_class->push_region = NULL; } -static void -gst_httpsrc_init (GstHttpSrc *httpsrc) -{ - httpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (httpsrc), httpsrc->srcpad); +static void gst_httpsrc_init(GstHttpSrc *httpsrc) { + httpsrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(httpsrc->srcpad,gst_httpsrc_pull); + gst_element_add_pad(GST_ELEMENT(httpsrc),httpsrc->srcpad); httpsrc->url = NULL; httpsrc->request = NULL; @@ -127,37 +122,31 @@ gst_httpsrc_init (GstHttpSrc *httpsrc) httpsrc->bytes_per_read = 4096; } -static void -gst_httpsrc_push (GstSrc *src) -{ - GstHttpSrc *httpsrc; +static void gst_httpsrc_push(GstPad *pad) { + GstHttpSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail (src != NULL); - g_return_if_fail (GST_IS_HTTPSRC (src)); -// g_return_if_fail(GST_FLAG_IS_SET(src,GST_)); - httpsrc = GST_HTTPSRC (src); + g_return_if_fail(pad != NULL); + src = GST_HTTPSRC(gst_pad_get_parent(pad)); + + buf = gst_buffer_new(); + GST_BUFFER_DATA(buf) = (gpointer)malloc(src->bytes_per_read); + readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read); - buf = gst_buffer_new (); - GST_BUFFER_DATA (buf) = (gpointer)malloc (httpsrc->bytes_per_read); - - readbytes = read (httpsrc->fd, GST_BUFFER_DATA (buf), httpsrc->bytes_per_read); - if (readbytes == 0) { - gst_src_signal_eos (GST_SRC (httpsrc)); + gst_src_signal_eos(GST_SRC(src)); return; } - if (readbytes < httpsrc->bytes_per_read) { + if (readbytes < src->bytes_per_read) { // FIXME: set the buffer's EOF bit here } - GST_BUFFER_OFFSET (buf) = httpsrc->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - - httpsrc->curoffset += readbytes; + GST_BUFFER_OFFSET(buf) = src->curoffset; + GST_BUFFER_SIZE(buf) = readbytes; + src->curoffset += readbytes; - gst_pad_push (httpsrc->srcpad, buf); + gst_pad_push(pad,buf); } static gboolean diff --git a/plugins/elements/gstsinesrc.c b/plugins/elements/gstsinesrc.c index cf302d0..271e0b3 100644 --- a/plugins/elements/gstsinesrc.c +++ b/plugins/elements/gstsinesrc.c @@ -62,7 +62,7 @@ static void gst_sinesrc_get_arg(GtkObject *object,GtkArg *arg,guint id); //static gboolean gst_sinesrc_open_audio(GstSineSrc *src); void gst_sinesrc_sync_parms(GstSineSrc *sinesrc); -void gst_sinesrc_push(GstSrc *src); +void gst_sinesrc_pull(GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_sinesrc_signals[LAST_SIGNAL] = { 0 }; @@ -112,12 +112,11 @@ gst_sinesrc_class_init(GstSineSrcClass *klass) { gtkobject_class->get_arg = gst_sinesrc_get_arg; // gstelement_class->change_state = gst_sinesrc_change_state; - - gstsrc_class->push = gst_sinesrc_push; } static void gst_sinesrc_init(GstSineSrc *sinesrc) { sinesrc->srcpad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_pull_function(sinesrc->srcpad,gst_sinesrc_pull); gst_element_add_pad(GST_ELEMENT(sinesrc),sinesrc->srcpad); sinesrc->volume = 1.0; @@ -143,17 +142,16 @@ GstElement *gst_sinesrc_new_with_fd(gchar *name,gchar *filename) { return sinesrc; } -void gst_sinesrc_push(GstSrc *src) { - GstSineSrc *sinesrc; +void gst_sinesrc_pull(GstPad *pad) { + GstSineSrc *src; GstBuffer *buf; gint16 *samples; gint i; gint volume; gdouble val; - g_return_if_fail(src != NULL); - g_return_if_fail(GST_IS_SINESRC(src)); - sinesrc = GST_SINESRC(src); + g_return_if_fail(pad != NULL); + src = GST_SINESRC(gst_pad_get_parent(pad)); buf = gst_buffer_new(); g_return_if_fail(buf); @@ -161,21 +159,21 @@ void gst_sinesrc_push(GstSrc *src) { samples = (gint16*)GST_BUFFER_DATA(buf); GST_BUFFER_SIZE(buf) = 4096; - volume = 65535 * sinesrc->volume; + volume = 65535 * src->volume; for (i=0;i<1024;i++) { - val = sin((gdouble)i/sinesrc->frequency); + val = sin((gdouble)i/src->frequency); samples[i] = val * volume; samples[i+1] = samples[i]; } - if (!sinesrc->sentmeta) { + if (!src->sentmeta) { MetaAudioRaw *newmeta = g_new(MetaAudioRaw,1); - memcpy(newmeta,&sinesrc->meta,sizeof(MetaAudioRaw)); + memcpy(newmeta,&src->meta,sizeof(MetaAudioRaw)); gst_buffer_add_meta(buf,GST_META(newmeta)); - sinesrc->sentmeta = TRUE; + src->sentmeta = TRUE; } - gst_pad_push(sinesrc->srcpad,buf); + gst_pad_push(pad,buf); g_print(">"); } -- 2.7.4