From 8a89e0b83e8a10f71861e2a2e061b0d95fbe190c Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sun, 20 May 2001 14:12:36 +0000 Subject: [PATCH] Made pipefilter a DECOUPLED element. Original commit message from CVS: Made pipefilter a DECOUPLED element. --- gst/elements/gstpipefilter.c | 142 ++++++++++++++++++++++++--------------- plugins/elements/gstpipefilter.c | 142 ++++++++++++++++++++++++--------------- 2 files changed, 178 insertions(+), 106 deletions(-) diff --git a/gst/elements/gstpipefilter.c b/gst/elements/gstpipefilter.c index 432a762..8b20e8b 100644 --- a/gst/elements/gstpipefilter.c +++ b/gst/elements/gstpipefilter.c @@ -38,7 +38,8 @@ GstElementDetails gst_pipefilter_details = { "Filter", "Pass data without modification", VERSION, - "Erik Walthinsen ", + "Erik Walthinsen \n" + "Wim Taymans ", "(C) 1999", }; @@ -55,20 +56,24 @@ enum { }; -static void gst_pipefilter_class_init(GstPipefilterClass *klass); -static void gst_pipefilter_init(GstPipefilter *pipefilter); -static void gst_pipefilter_set_arg(GtkObject *object,GtkArg *arg,guint id); -static void gst_pipefilter_get_arg(GtkObject *object,GtkArg *arg,guint id); +static void gst_pipefilter_class_init (GstPipefilterClass *klass); +static void gst_pipefilter_init (GstPipefilter *pipefilter); -void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf); +static void gst_pipefilter_set_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_pipefilter_get_arg (GtkObject *object, GtkArg *arg, guint id); -static GstElementStateReturn gst_pipefilter_change_state(GstElement *element); +static GstBuffer* gst_pipefilter_get (GstPad *pad); +static void gst_pipefilter_chain (GstPad *pad, GstBuffer *buf); +static gboolean gst_pipefilter_handle_eos (GstPad *pad); + +static GstElementStateReturn gst_pipefilter_change_state (GstElement *element); static GstElementClass *parent_class = NULL; //static guint gst_pipefilter_signals[LAST_SIGNAL] = { 0 }; GtkType -gst_pipefilter_get_type(void) { +gst_pipefilter_get_type (void) +{ static GtkType pipefilter_type = 0; if (!pipefilter_type) { @@ -87,7 +92,9 @@ gst_pipefilter_get_type(void) { return pipefilter_type; } -static void gst_pipefilter_class_init(GstPipefilterClass *klass) { +static void +gst_pipefilter_class_init (GstPipefilterClass *klass) +{ GtkObjectClass *gtkobject_class; GstElementClass *gstelement_class; @@ -105,12 +112,19 @@ static void gst_pipefilter_class_init(GstPipefilterClass *klass) { gtkobject_class->get_arg = gst_pipefilter_get_arg; } -static void gst_pipefilter_init(GstPipefilter *pipefilter) { - pipefilter->sinkpad = gst_pad_new("sink",GST_PAD_SINK); - gst_element_add_pad(GST_ELEMENT(pipefilter),pipefilter->sinkpad); - gst_pad_set_chain_function(pipefilter->sinkpad,gst_pipefilter_chain); - pipefilter->srcpad = gst_pad_new("src",GST_PAD_SRC); - gst_element_add_pad(GST_ELEMENT(pipefilter),pipefilter->srcpad); +static void +gst_pipefilter_init (GstPipefilter *pipefilter) +{ + GST_FLAG_SET (pipefilter, GST_ELEMENT_DECOUPLED); + + pipefilter->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); + gst_element_add_pad (GST_ELEMENT (pipefilter), pipefilter->sinkpad); + gst_pad_set_chain_function (pipefilter->sinkpad, gst_pipefilter_chain); + gst_pad_set_eos_function (pipefilter->sinkpad, gst_pipefilter_handle_eos); + + pipefilter->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_element_add_pad (GST_ELEMENT (pipefilter), pipefilter->srcpad); + gst_pad_set_get_function (pipefilter->srcpad, gst_pipefilter_get); pipefilter->command = NULL; pipefilter->curoffset = 0; @@ -118,51 +132,67 @@ static void gst_pipefilter_init(GstPipefilter *pipefilter) { pipefilter->seq = 0; } -static gboolean gst_pipefilter_read_and_push(GstPipefilter *pipefilter) { +static gboolean +gst_pipefilter_handle_eos (GstPad *pad) +{ + GstPipefilter *pipefilter; + + pipefilter = GST_PIPEFILTER (gst_pad_get_parent (pad)); + + GST_DEBUG (0,"pipefilter: %s received eos\n", GST_ELEMENT_NAME (pipefilter)); + if (close (pipefilter->fdin[1]) < 0) + perror("close"); + if (close (pipefilter->fdout[0]) < 0) + perror("close"); + + GST_FLAG_SET (pad, GST_PAD_EOS); + + return TRUE; +} + +static GstBuffer* +gst_pipefilter_get (GstPad *pad) +{ + GstPipefilter *pipefilter; GstBuffer *newbuf; glong readbytes; + pipefilter = GST_PIPEFILTER (gst_pad_get_parent (pad)); + /* create the buffer */ // FIXME: should eventually use a bufferpool for this newbuf = gst_buffer_new(); - g_return_val_if_fail(newbuf, FALSE); + g_return_val_if_fail(newbuf, NULL); /* allocate the space for the buffer data */ GST_BUFFER_DATA(newbuf) = g_malloc(pipefilter->bytes_per_read); - g_return_val_if_fail(GST_BUFFER_DATA(newbuf) != NULL, FALSE); + g_return_val_if_fail(GST_BUFFER_DATA(newbuf) != NULL, NULL); /* read it in from the file */ GST_DEBUG (0,"attemting to read %ld bytes\n", pipefilter->bytes_per_read); - readbytes = read(pipefilter->fdout[0],GST_BUFFER_DATA(newbuf),pipefilter->bytes_per_read); + readbytes = read(pipefilter->fdout[0], GST_BUFFER_DATA(newbuf), pipefilter->bytes_per_read); GST_DEBUG (0,"read %ld bytes\n", readbytes); if (readbytes < 0) { - if (errno == EAGAIN) { - GST_DEBUG (0,"no input yet\n"); - gst_buffer_unref(newbuf); - return FALSE; - } - else { - perror("read"); - gst_element_error(GST_ELEMENT(pipefilter),"reading"); - return FALSE; - } + perror("read"); + gst_element_error(GST_ELEMENT(pipefilter),"reading"); + return NULL; } + /* if we didn't get as many bytes as we asked for, we're at EOF */ if (readbytes == 0) { - gst_buffer_unref(newbuf); - return FALSE; + gst_pad_set_eos (pad); + return NULL; } - /* if we didn't get as many bytes as we asked for, we're at EOF */ - if (readbytes < pipefilter->bytes_per_read) - GST_BUFFER_FLAG_SET(newbuf,GST_BUFFER_EOS); + GST_BUFFER_OFFSET(newbuf) = pipefilter->curoffset; GST_BUFFER_SIZE(newbuf) = readbytes; pipefilter->curoffset += readbytes; - /* we're done, push the buffer off now */ - gst_pad_push(pipefilter->srcpad,newbuf); - return TRUE; + return newbuf; } -void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf) { + +static void +gst_pipefilter_chain (GstPad *pad,GstBuffer *buf) +{ GstPipefilter *pipefilter; glong writebytes; guchar *data; @@ -174,8 +204,6 @@ void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf) { pipefilter = GST_PIPEFILTER (gst_pad_get_parent (pad)); - while (gst_pipefilter_read_and_push(pipefilter)); - data = GST_BUFFER_DATA(buf); size = GST_BUFFER_SIZE(buf); @@ -188,11 +216,11 @@ void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf) { return; } gst_buffer_unref(buf); - - while (gst_pipefilter_read_and_push(pipefilter)); } -static void gst_pipefilter_set_arg(GtkObject *object,GtkArg *arg,guint id) { +static void +gst_pipefilter_set_arg (GtkObject *object,GtkArg *arg,guint id) +{ GstPipefilter *pipefilter; /* it's not null if we got it, but it might not be ours */ @@ -209,7 +237,9 @@ static void gst_pipefilter_set_arg(GtkObject *object,GtkArg *arg,guint id) { } } -static void gst_pipefilter_get_arg(GtkObject *object,GtkArg *arg,guint id) { +static void +gst_pipefilter_get_arg (GtkObject *object,GtkArg *arg,guint id) +{ GstPipefilter *pipefilter; /* it's not null if we got it, but it might not be ours */ @@ -227,18 +257,14 @@ static void gst_pipefilter_get_arg(GtkObject *object,GtkArg *arg,guint id) { } /* open the file, necessary to go to RUNNING state */ -static gboolean gst_pipefilter_open_file(GstPipefilter *src) { +static gboolean +gst_pipefilter_open_file (GstPipefilter *src) +{ g_return_val_if_fail(!GST_FLAG_IS_SET(src,GST_PIPEFILTER_OPEN), FALSE); pipe(src->fdin); pipe(src->fdout); - if (fcntl(src->fdout[0], F_SETFL, O_NONBLOCK) < 0) { - perror("fcntl"); - gst_element_error(GST_ELEMENT(src),"fcntl"); - return FALSE; - } - if((src->childpid = fork()) == -1) { perror("fork"); @@ -248,6 +274,8 @@ static gboolean gst_pipefilter_open_file(GstPipefilter *src) { if(src->childpid == 0) { + close(src->fdin[1]); + close(src->fdout[0]); // child dup2(src->fdin[0], STDIN_FILENO); /* set the childs input stream */ dup2(src->fdout[1], STDOUT_FILENO); /* set the childs output stream */ @@ -258,13 +286,19 @@ static gboolean gst_pipefilter_open_file(GstPipefilter *src) { return FALSE; } + else { + close(src->fdin[0]); + close(src->fdout[1]); + } GST_FLAG_SET(src,GST_PIPEFILTER_OPEN); return TRUE; } /* close the file */ -static void gst_pipefilter_close_file(GstPipefilter *src) { +static void +gst_pipefilter_close_file (GstPipefilter *src) +{ g_return_if_fail(GST_FLAG_IS_SET(src,GST_PIPEFILTER_OPEN)); /* close the file */ @@ -280,7 +314,9 @@ static void gst_pipefilter_close_file(GstPipefilter *src) { GST_FLAG_UNSET(src,GST_PIPEFILTER_OPEN); } -static GstElementStateReturn gst_pipefilter_change_state(GstElement *element) { +static GstElementStateReturn +gst_pipefilter_change_state (GstElement *element) +{ g_return_val_if_fail(GST_IS_PIPEFILTER(element), FALSE); /* if going down into NULL state, close the file if it's open */ diff --git a/plugins/elements/gstpipefilter.c b/plugins/elements/gstpipefilter.c index 432a762..8b20e8b 100644 --- a/plugins/elements/gstpipefilter.c +++ b/plugins/elements/gstpipefilter.c @@ -38,7 +38,8 @@ GstElementDetails gst_pipefilter_details = { "Filter", "Pass data without modification", VERSION, - "Erik Walthinsen ", + "Erik Walthinsen \n" + "Wim Taymans ", "(C) 1999", }; @@ -55,20 +56,24 @@ enum { }; -static void gst_pipefilter_class_init(GstPipefilterClass *klass); -static void gst_pipefilter_init(GstPipefilter *pipefilter); -static void gst_pipefilter_set_arg(GtkObject *object,GtkArg *arg,guint id); -static void gst_pipefilter_get_arg(GtkObject *object,GtkArg *arg,guint id); +static void gst_pipefilter_class_init (GstPipefilterClass *klass); +static void gst_pipefilter_init (GstPipefilter *pipefilter); -void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf); +static void gst_pipefilter_set_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_pipefilter_get_arg (GtkObject *object, GtkArg *arg, guint id); -static GstElementStateReturn gst_pipefilter_change_state(GstElement *element); +static GstBuffer* gst_pipefilter_get (GstPad *pad); +static void gst_pipefilter_chain (GstPad *pad, GstBuffer *buf); +static gboolean gst_pipefilter_handle_eos (GstPad *pad); + +static GstElementStateReturn gst_pipefilter_change_state (GstElement *element); static GstElementClass *parent_class = NULL; //static guint gst_pipefilter_signals[LAST_SIGNAL] = { 0 }; GtkType -gst_pipefilter_get_type(void) { +gst_pipefilter_get_type (void) +{ static GtkType pipefilter_type = 0; if (!pipefilter_type) { @@ -87,7 +92,9 @@ gst_pipefilter_get_type(void) { return pipefilter_type; } -static void gst_pipefilter_class_init(GstPipefilterClass *klass) { +static void +gst_pipefilter_class_init (GstPipefilterClass *klass) +{ GtkObjectClass *gtkobject_class; GstElementClass *gstelement_class; @@ -105,12 +112,19 @@ static void gst_pipefilter_class_init(GstPipefilterClass *klass) { gtkobject_class->get_arg = gst_pipefilter_get_arg; } -static void gst_pipefilter_init(GstPipefilter *pipefilter) { - pipefilter->sinkpad = gst_pad_new("sink",GST_PAD_SINK); - gst_element_add_pad(GST_ELEMENT(pipefilter),pipefilter->sinkpad); - gst_pad_set_chain_function(pipefilter->sinkpad,gst_pipefilter_chain); - pipefilter->srcpad = gst_pad_new("src",GST_PAD_SRC); - gst_element_add_pad(GST_ELEMENT(pipefilter),pipefilter->srcpad); +static void +gst_pipefilter_init (GstPipefilter *pipefilter) +{ + GST_FLAG_SET (pipefilter, GST_ELEMENT_DECOUPLED); + + pipefilter->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); + gst_element_add_pad (GST_ELEMENT (pipefilter), pipefilter->sinkpad); + gst_pad_set_chain_function (pipefilter->sinkpad, gst_pipefilter_chain); + gst_pad_set_eos_function (pipefilter->sinkpad, gst_pipefilter_handle_eos); + + pipefilter->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_element_add_pad (GST_ELEMENT (pipefilter), pipefilter->srcpad); + gst_pad_set_get_function (pipefilter->srcpad, gst_pipefilter_get); pipefilter->command = NULL; pipefilter->curoffset = 0; @@ -118,51 +132,67 @@ static void gst_pipefilter_init(GstPipefilter *pipefilter) { pipefilter->seq = 0; } -static gboolean gst_pipefilter_read_and_push(GstPipefilter *pipefilter) { +static gboolean +gst_pipefilter_handle_eos (GstPad *pad) +{ + GstPipefilter *pipefilter; + + pipefilter = GST_PIPEFILTER (gst_pad_get_parent (pad)); + + GST_DEBUG (0,"pipefilter: %s received eos\n", GST_ELEMENT_NAME (pipefilter)); + if (close (pipefilter->fdin[1]) < 0) + perror("close"); + if (close (pipefilter->fdout[0]) < 0) + perror("close"); + + GST_FLAG_SET (pad, GST_PAD_EOS); + + return TRUE; +} + +static GstBuffer* +gst_pipefilter_get (GstPad *pad) +{ + GstPipefilter *pipefilter; GstBuffer *newbuf; glong readbytes; + pipefilter = GST_PIPEFILTER (gst_pad_get_parent (pad)); + /* create the buffer */ // FIXME: should eventually use a bufferpool for this newbuf = gst_buffer_new(); - g_return_val_if_fail(newbuf, FALSE); + g_return_val_if_fail(newbuf, NULL); /* allocate the space for the buffer data */ GST_BUFFER_DATA(newbuf) = g_malloc(pipefilter->bytes_per_read); - g_return_val_if_fail(GST_BUFFER_DATA(newbuf) != NULL, FALSE); + g_return_val_if_fail(GST_BUFFER_DATA(newbuf) != NULL, NULL); /* read it in from the file */ GST_DEBUG (0,"attemting to read %ld bytes\n", pipefilter->bytes_per_read); - readbytes = read(pipefilter->fdout[0],GST_BUFFER_DATA(newbuf),pipefilter->bytes_per_read); + readbytes = read(pipefilter->fdout[0], GST_BUFFER_DATA(newbuf), pipefilter->bytes_per_read); GST_DEBUG (0,"read %ld bytes\n", readbytes); if (readbytes < 0) { - if (errno == EAGAIN) { - GST_DEBUG (0,"no input yet\n"); - gst_buffer_unref(newbuf); - return FALSE; - } - else { - perror("read"); - gst_element_error(GST_ELEMENT(pipefilter),"reading"); - return FALSE; - } + perror("read"); + gst_element_error(GST_ELEMENT(pipefilter),"reading"); + return NULL; } + /* if we didn't get as many bytes as we asked for, we're at EOF */ if (readbytes == 0) { - gst_buffer_unref(newbuf); - return FALSE; + gst_pad_set_eos (pad); + return NULL; } - /* if we didn't get as many bytes as we asked for, we're at EOF */ - if (readbytes < pipefilter->bytes_per_read) - GST_BUFFER_FLAG_SET(newbuf,GST_BUFFER_EOS); + GST_BUFFER_OFFSET(newbuf) = pipefilter->curoffset; GST_BUFFER_SIZE(newbuf) = readbytes; pipefilter->curoffset += readbytes; - /* we're done, push the buffer off now */ - gst_pad_push(pipefilter->srcpad,newbuf); - return TRUE; + return newbuf; } -void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf) { + +static void +gst_pipefilter_chain (GstPad *pad,GstBuffer *buf) +{ GstPipefilter *pipefilter; glong writebytes; guchar *data; @@ -174,8 +204,6 @@ void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf) { pipefilter = GST_PIPEFILTER (gst_pad_get_parent (pad)); - while (gst_pipefilter_read_and_push(pipefilter)); - data = GST_BUFFER_DATA(buf); size = GST_BUFFER_SIZE(buf); @@ -188,11 +216,11 @@ void gst_pipefilter_chain(GstPad *pad,GstBuffer *buf) { return; } gst_buffer_unref(buf); - - while (gst_pipefilter_read_and_push(pipefilter)); } -static void gst_pipefilter_set_arg(GtkObject *object,GtkArg *arg,guint id) { +static void +gst_pipefilter_set_arg (GtkObject *object,GtkArg *arg,guint id) +{ GstPipefilter *pipefilter; /* it's not null if we got it, but it might not be ours */ @@ -209,7 +237,9 @@ static void gst_pipefilter_set_arg(GtkObject *object,GtkArg *arg,guint id) { } } -static void gst_pipefilter_get_arg(GtkObject *object,GtkArg *arg,guint id) { +static void +gst_pipefilter_get_arg (GtkObject *object,GtkArg *arg,guint id) +{ GstPipefilter *pipefilter; /* it's not null if we got it, but it might not be ours */ @@ -227,18 +257,14 @@ static void gst_pipefilter_get_arg(GtkObject *object,GtkArg *arg,guint id) { } /* open the file, necessary to go to RUNNING state */ -static gboolean gst_pipefilter_open_file(GstPipefilter *src) { +static gboolean +gst_pipefilter_open_file (GstPipefilter *src) +{ g_return_val_if_fail(!GST_FLAG_IS_SET(src,GST_PIPEFILTER_OPEN), FALSE); pipe(src->fdin); pipe(src->fdout); - if (fcntl(src->fdout[0], F_SETFL, O_NONBLOCK) < 0) { - perror("fcntl"); - gst_element_error(GST_ELEMENT(src),"fcntl"); - return FALSE; - } - if((src->childpid = fork()) == -1) { perror("fork"); @@ -248,6 +274,8 @@ static gboolean gst_pipefilter_open_file(GstPipefilter *src) { if(src->childpid == 0) { + close(src->fdin[1]); + close(src->fdout[0]); // child dup2(src->fdin[0], STDIN_FILENO); /* set the childs input stream */ dup2(src->fdout[1], STDOUT_FILENO); /* set the childs output stream */ @@ -258,13 +286,19 @@ static gboolean gst_pipefilter_open_file(GstPipefilter *src) { return FALSE; } + else { + close(src->fdin[0]); + close(src->fdout[1]); + } GST_FLAG_SET(src,GST_PIPEFILTER_OPEN); return TRUE; } /* close the file */ -static void gst_pipefilter_close_file(GstPipefilter *src) { +static void +gst_pipefilter_close_file (GstPipefilter *src) +{ g_return_if_fail(GST_FLAG_IS_SET(src,GST_PIPEFILTER_OPEN)); /* close the file */ @@ -280,7 +314,9 @@ static void gst_pipefilter_close_file(GstPipefilter *src) { GST_FLAG_UNSET(src,GST_PIPEFILTER_OPEN); } -static GstElementStateReturn gst_pipefilter_change_state(GstElement *element) { +static GstElementStateReturn +gst_pipefilter_change_state (GstElement *element) +{ g_return_val_if_fail(GST_IS_PIPEFILTER(element), FALSE); /* if going down into NULL state, close the file if it's open */ -- 2.7.4