From 71491998d56032e10ad791a6d23784caa6cb487a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 21 Sep 2005 09:13:32 +0000 Subject: [PATCH] gst/elements/gstfdsrc.*: Properly implement fdsrc. Removed signal and timeout, better implemented somewhere else. Original commit message from CVS: * gst/elements/gstfdsrc.c: (gst_fdsrc_class_init), (gst_fdsrc_init), (gst_fdsrc_start), (gst_fdsrc_stop), (gst_fdsrc_unlock), (gst_fdsrc_set_property), (gst_fdsrc_get_property), (gst_fdsrc_create): * gst/elements/gstfdsrc.h: Properly implement fdsrc. Removed signal and timeout, better implemented somewhere else. --- ChangeLog | 10 ++ gst/elements/gstfdsrc.c | 229 +++++++++++++++++++++++++------------------- gst/elements/gstfdsrc.h | 6 +- plugins/elements/gstfdsrc.c | 229 +++++++++++++++++++++++++------------------- plugins/elements/gstfdsrc.h | 6 +- 5 files changed, 274 insertions(+), 206 deletions(-) diff --git a/ChangeLog b/ChangeLog index a9c2715..49febfc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +2005-09-21 Wim Taymans + + * gst/elements/gstfdsrc.c: (gst_fdsrc_class_init), + (gst_fdsrc_init), (gst_fdsrc_start), (gst_fdsrc_stop), + (gst_fdsrc_unlock), (gst_fdsrc_set_property), + (gst_fdsrc_get_property), (gst_fdsrc_create): + * gst/elements/gstfdsrc.h: + Properly implement fdsrc. Removed signal and timeout, + better implemented somewhere else. + 2005-09-21 Stefan Kost * docs/gst/tmpl/.cvsignore: diff --git a/gst/elements/gstfdsrc.c b/gst/elements/gstfdsrc.c index dfc77a5..2666191 100644 --- a/gst/elements/gstfdsrc.c +++ b/gst/elements/gstfdsrc.c @@ -28,6 +28,7 @@ #include #include +#include #include #include #ifdef HAVE_UNISTD_H @@ -38,6 +39,24 @@ #include "gstfdsrc.h" +/* the select call is also performed on the control sockets, that way + * we can send special commands to unblock the select call */ +#define CONTROL_STOP 'S' /* stop the select call */ +#define CONTROL_SOCKETS(src) src->control_sock +#define WRITE_SOCKET(src) src->control_sock[1] +#define READ_SOCKET(src) src->control_sock[0] + +#define SEND_COMMAND(src, command) \ +G_STMT_START { \ + unsigned char c; c = command; \ + write (WRITE_SOCKET(src), &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(src, command, res) \ +G_STMT_START { \ + res = read(READ_SOCKET(src), &command, 1); \ +} G_STMT_END + #define DEFAULT_BLOCKSIZE 4096 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", @@ -69,8 +88,6 @@ enum ARG_TIMEOUT }; -static guint gst_fdsrc_signals[LAST_SIGNAL] = { 0 }; - #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (gst_fdsrc_debug, "fdsrc", 0, "fdsrc element"); @@ -82,8 +99,9 @@ static void gst_fdsrc_set_property (GObject * object, guint prop_id, static void gst_fdsrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstStateChangeReturn gst_fdsrc_change_state (GstElement * element, - GstStateChange transition); +static gboolean gst_fdsrc_start (GstBaseSrc * bsrc); +static gboolean gst_fdsrc_stop (GstBaseSrc * bsrc); +static gboolean gst_fdsrc_unlock (GstBaseSrc * bsrc); static GstFlowReturn gst_fdsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf); @@ -101,10 +119,11 @@ gst_fdsrc_class_init (GstFdSrcClass * klass) { GObjectClass *gobject_class; GstBaseSrcClass *gstbasesrc_class; - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstElementClass *gstelement_class; GstPushSrcClass *gstpush_src_class; gobject_class = G_OBJECT_CLASS (klass); + gstelement_class = GST_ELEMENT_CLASS (klass); gstbasesrc_class = (GstBaseSrcClass *) klass; gstpush_src_class = (GstPushSrcClass *) klass; @@ -116,23 +135,12 @@ gst_fdsrc_class_init (GstFdSrcClass * klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_FD, g_param_spec_int ("fd", "fd", "An open file descriptor to read from", 0, G_MAXINT, 0, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BLOCKSIZE, - g_param_spec_ulong ("blocksize", "Block size", - "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE, - G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, - g_param_spec_uint64 ("timeout", "Timeout", "Read timeout in nanoseconds", - 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); - gst_fdsrc_signals[SIGNAL_TIMEOUT] = - g_signal_new ("timeout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, - G_STRUCT_OFFSET (GstFdSrcClass, timeout), NULL, NULL, - g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - gstelement_class->change_state = gst_fdsrc_change_state; + gstbasesrc_class->start = gst_fdsrc_start; + gstbasesrc_class->stop = gst_fdsrc_stop; + gstbasesrc_class->unlock = gst_fdsrc_unlock; gstpush_src_class->create = gst_fdsrc_create; - } static void @@ -144,36 +152,57 @@ gst_fdsrc_init (GstFdSrc * fdsrc, GstFdSrcClass * klass) fdsrc->fd = 0; fdsrc->curoffset = 0; - fdsrc->blocksize = DEFAULT_BLOCKSIZE; - fdsrc->timeout = 0; - fdsrc->seq = 0; } -static GstStateChangeReturn -gst_fdsrc_change_state (GstElement * element, GstStateChange transition) + +static gboolean +gst_fdsrc_start (GstBaseSrc * bsrc) { - GstFdSrc *src = GST_FDSRC (element); + GstFdSrc *src = GST_FDSRC (bsrc); + gint control_sock[2]; - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - break; - case GST_STATE_CHANGE_READY_TO_NULL: - break; - case GST_STATE_CHANGE_READY_TO_PAUSED: - src->curoffset = 0; - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - break; - default: - break; + src->curoffset = 0; + + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) + goto socket_pair; + + READ_SOCKET (src) = control_sock[0]; + WRITE_SOCKET (src) = control_sock[1]; + + fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); + + return TRUE; + + /* ERRORS */ +socket_pair: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + GST_ERROR_SYSTEM); + return FALSE; } +} + +static gboolean +gst_fdsrc_stop (GstBaseSrc * bsrc) +{ + GstFdSrc *src = GST_FDSRC (bsrc); - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + close (READ_SOCKET (src)); + close (WRITE_SOCKET (src)); - return GST_STATE_CHANGE_SUCCESS; + return TRUE; } +static gboolean +gst_fdsrc_unlock (GstBaseSrc * bsrc) +{ + GstFdSrc *src = GST_FDSRC (bsrc); + + SEND_COMMAND (src, CONTROL_STOP); + + return TRUE; +} static void gst_fdsrc_set_property (GObject * object, guint prop_id, const GValue * value, @@ -189,12 +218,6 @@ gst_fdsrc_set_property (GObject * object, guint prop_id, const GValue * value, case ARG_FD: src->fd = g_value_get_int (value); break; - case ARG_BLOCKSIZE: - src->blocksize = g_value_get_ulong (value); - break; - case ARG_TIMEOUT: - src->timeout = g_value_get_uint64 (value); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -212,34 +235,25 @@ gst_fdsrc_get_property (GObject * object, guint prop_id, GValue * value, src = GST_FDSRC (object); switch (prop_id) { - case ARG_BLOCKSIZE: - g_value_set_ulong (value, src->blocksize); - break; case ARG_FD: g_value_set_int (value, src->fd); break; - case ARG_TIMEOUT: - g_value_set_uint64 (value, src->timeout); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } -#define SELECT_TIMEOUT (GST_SECOND / 20) - static GstFlowReturn gst_fdsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstFdSrc *src; GstBuffer *buf; glong readbytes; - GstClockTime timeout; + guint blocksize; #ifndef HAVE_WIN32 fd_set readfds; - struct timeval t, *tp = &t; gint retval; #endif @@ -248,65 +262,82 @@ gst_fdsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) #ifndef HAVE_WIN32 FD_ZERO (&readfds); FD_SET (src->fd, &readfds); - - if (src->timeout != 0) { - timeout = MIN (SELECT_TIMEOUT, src->timeout); - } else { - timeout = SELECT_TIMEOUT; - } + FD_SET (READ_SOCKET (src), &readfds); do { - GST_TIME_TO_TIMEVAL (timeout, t); - if (src->timeout != 0) - timeout -= MIN (timeout, SELECT_TIMEOUT); - - retval = select (src->fd + 1, &readfds, NULL, NULL, tp); - - /* Check whether the element got shutdown before full timeout */ - if (retval == 0) { - if (GST_PAD_IS_FLUSHING (GST_BASE_SRC_PAD (src))) { - GST_DEBUG_OBJECT (src, "Shutting down with no buffer."); - return GST_FLOW_WRONG_STATE; + retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL); + } while ((retval == -1 && errno == EINTR)); + + if (retval == -1) + goto select_error; + + if (FD_ISSET (READ_SOCKET (src), &readfds)) { + /* read all stop commands */ + while (TRUE) { + gchar command; + int res; + + READ_COMMAND (src, command, res); + if (res < 0) { + GST_LOG_OBJECT (src, "no more commands"); + /* no more commands */ + break; } } - } while ((retval == -1 && errno == EINTR) || /* retry if interrupted */ - (retval == 0 && timeout > 0)); /* Retry on incomplete timeout */ - - if (retval == -1) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("select on file descriptor: %s.", g_strerror (errno))); - GST_DEBUG_OBJECT (psrc, "Error during select"); - return GST_FLOW_ERROR; - } else if (retval == 0) { - g_signal_emit (G_OBJECT (src), gst_fdsrc_signals[SIGNAL_TIMEOUT], 0); - GST_DEBUG_OBJECT (psrc, "Timeout in select"); - return GST_FLOW_ERROR; + goto stopped; } #endif + blocksize = GST_BASE_SRC (src)->blocksize; + /* create the buffer */ - buf = gst_buffer_new_and_alloc (src->blocksize); + buf = gst_buffer_new_and_alloc (blocksize); do { - readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->blocksize); + readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize); } while (readbytes == -1 && errno == EINTR); /* retry if interrupted */ - if (readbytes > 0) { - GST_BUFFER_OFFSET (buf) = src->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; - src->curoffset += readbytes; + if (readbytes < 0) + goto read_error; + + if (readbytes == 0) + goto eos; - GST_DEBUG_OBJECT (psrc, "Read buffer of size %u.", readbytes); + GST_BUFFER_OFFSET (buf) = src->curoffset; + GST_BUFFER_SIZE (buf) = readbytes; + GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; + src->curoffset += readbytes; - /* we're done, return the buffer */ - *outbuf = buf; - return GST_FLOW_OK; - } else if (readbytes == 0) { + GST_DEBUG_OBJECT (psrc, "Read buffer of size %u.", readbytes); + + /* we're done, return the buffer */ + *outbuf = buf; + + return GST_FLOW_OK; + + /* ERRORS */ +#ifndef HAVE_WIN32 +select_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("select on file descriptor: %s.", g_strerror (errno))); + GST_DEBUG_OBJECT (psrc, "Error during select"); + return GST_FLOW_ERROR; + } +stopped: + { + GST_DEBUG_OBJECT (psrc, "Select stopped"); + return GST_FLOW_WRONG_STATE; + } +#endif +eos: + { GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS."); gst_buffer_unref (buf); - return GST_FLOW_ERROR; - } else { + return GST_FLOW_UNEXPECTED; + } +read_error: + { GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("read on file descriptor: %s.", g_strerror (errno))); GST_DEBUG_OBJECT (psrc, "Error reading from fd"); diff --git a/gst/elements/gstfdsrc.h b/gst/elements/gstfdsrc.h index 0004242..fe15195 100644 --- a/gst/elements/gstfdsrc.h +++ b/gst/elements/gstfdsrc.h @@ -52,11 +52,9 @@ struct _GstFdSrc { /* fd */ gint fd; + gint control_sock[2]; + gulong curoffset; /* current offset in file */ - gulong blocksize; /* bytes per read */ - guint64 timeout; /* read timeout, in nanoseconds */ - - gulong seq; /* buffer sequence number */ }; struct _GstFdSrcClass { diff --git a/plugins/elements/gstfdsrc.c b/plugins/elements/gstfdsrc.c index dfc77a5..2666191 100644 --- a/plugins/elements/gstfdsrc.c +++ b/plugins/elements/gstfdsrc.c @@ -28,6 +28,7 @@ #include #include +#include #include #include #ifdef HAVE_UNISTD_H @@ -38,6 +39,24 @@ #include "gstfdsrc.h" +/* the select call is also performed on the control sockets, that way + * we can send special commands to unblock the select call */ +#define CONTROL_STOP 'S' /* stop the select call */ +#define CONTROL_SOCKETS(src) src->control_sock +#define WRITE_SOCKET(src) src->control_sock[1] +#define READ_SOCKET(src) src->control_sock[0] + +#define SEND_COMMAND(src, command) \ +G_STMT_START { \ + unsigned char c; c = command; \ + write (WRITE_SOCKET(src), &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(src, command, res) \ +G_STMT_START { \ + res = read(READ_SOCKET(src), &command, 1); \ +} G_STMT_END + #define DEFAULT_BLOCKSIZE 4096 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", @@ -69,8 +88,6 @@ enum ARG_TIMEOUT }; -static guint gst_fdsrc_signals[LAST_SIGNAL] = { 0 }; - #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (gst_fdsrc_debug, "fdsrc", 0, "fdsrc element"); @@ -82,8 +99,9 @@ static void gst_fdsrc_set_property (GObject * object, guint prop_id, static void gst_fdsrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstStateChangeReturn gst_fdsrc_change_state (GstElement * element, - GstStateChange transition); +static gboolean gst_fdsrc_start (GstBaseSrc * bsrc); +static gboolean gst_fdsrc_stop (GstBaseSrc * bsrc); +static gboolean gst_fdsrc_unlock (GstBaseSrc * bsrc); static GstFlowReturn gst_fdsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf); @@ -101,10 +119,11 @@ gst_fdsrc_class_init (GstFdSrcClass * klass) { GObjectClass *gobject_class; GstBaseSrcClass *gstbasesrc_class; - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstElementClass *gstelement_class; GstPushSrcClass *gstpush_src_class; gobject_class = G_OBJECT_CLASS (klass); + gstelement_class = GST_ELEMENT_CLASS (klass); gstbasesrc_class = (GstBaseSrcClass *) klass; gstpush_src_class = (GstPushSrcClass *) klass; @@ -116,23 +135,12 @@ gst_fdsrc_class_init (GstFdSrcClass * klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_FD, g_param_spec_int ("fd", "fd", "An open file descriptor to read from", 0, G_MAXINT, 0, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BLOCKSIZE, - g_param_spec_ulong ("blocksize", "Block size", - "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE, - G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, - g_param_spec_uint64 ("timeout", "Timeout", "Read timeout in nanoseconds", - 0, G_MAXUINT64, 0, G_PARAM_READWRITE)); - gst_fdsrc_signals[SIGNAL_TIMEOUT] = - g_signal_new ("timeout", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, - G_STRUCT_OFFSET (GstFdSrcClass, timeout), NULL, NULL, - g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - gstelement_class->change_state = gst_fdsrc_change_state; + gstbasesrc_class->start = gst_fdsrc_start; + gstbasesrc_class->stop = gst_fdsrc_stop; + gstbasesrc_class->unlock = gst_fdsrc_unlock; gstpush_src_class->create = gst_fdsrc_create; - } static void @@ -144,36 +152,57 @@ gst_fdsrc_init (GstFdSrc * fdsrc, GstFdSrcClass * klass) fdsrc->fd = 0; fdsrc->curoffset = 0; - fdsrc->blocksize = DEFAULT_BLOCKSIZE; - fdsrc->timeout = 0; - fdsrc->seq = 0; } -static GstStateChangeReturn -gst_fdsrc_change_state (GstElement * element, GstStateChange transition) + +static gboolean +gst_fdsrc_start (GstBaseSrc * bsrc) { - GstFdSrc *src = GST_FDSRC (element); + GstFdSrc *src = GST_FDSRC (bsrc); + gint control_sock[2]; - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - break; - case GST_STATE_CHANGE_READY_TO_NULL: - break; - case GST_STATE_CHANGE_READY_TO_PAUSED: - src->curoffset = 0; - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - break; - default: - break; + src->curoffset = 0; + + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) + goto socket_pair; + + READ_SOCKET (src) = control_sock[0]; + WRITE_SOCKET (src) = control_sock[1]; + + fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); + + return TRUE; + + /* ERRORS */ +socket_pair: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + GST_ERROR_SYSTEM); + return FALSE; } +} + +static gboolean +gst_fdsrc_stop (GstBaseSrc * bsrc) +{ + GstFdSrc *src = GST_FDSRC (bsrc); - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + close (READ_SOCKET (src)); + close (WRITE_SOCKET (src)); - return GST_STATE_CHANGE_SUCCESS; + return TRUE; } +static gboolean +gst_fdsrc_unlock (GstBaseSrc * bsrc) +{ + GstFdSrc *src = GST_FDSRC (bsrc); + + SEND_COMMAND (src, CONTROL_STOP); + + return TRUE; +} static void gst_fdsrc_set_property (GObject * object, guint prop_id, const GValue * value, @@ -189,12 +218,6 @@ gst_fdsrc_set_property (GObject * object, guint prop_id, const GValue * value, case ARG_FD: src->fd = g_value_get_int (value); break; - case ARG_BLOCKSIZE: - src->blocksize = g_value_get_ulong (value); - break; - case ARG_TIMEOUT: - src->timeout = g_value_get_uint64 (value); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -212,34 +235,25 @@ gst_fdsrc_get_property (GObject * object, guint prop_id, GValue * value, src = GST_FDSRC (object); switch (prop_id) { - case ARG_BLOCKSIZE: - g_value_set_ulong (value, src->blocksize); - break; case ARG_FD: g_value_set_int (value, src->fd); break; - case ARG_TIMEOUT: - g_value_set_uint64 (value, src->timeout); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } -#define SELECT_TIMEOUT (GST_SECOND / 20) - static GstFlowReturn gst_fdsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstFdSrc *src; GstBuffer *buf; glong readbytes; - GstClockTime timeout; + guint blocksize; #ifndef HAVE_WIN32 fd_set readfds; - struct timeval t, *tp = &t; gint retval; #endif @@ -248,65 +262,82 @@ gst_fdsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) #ifndef HAVE_WIN32 FD_ZERO (&readfds); FD_SET (src->fd, &readfds); - - if (src->timeout != 0) { - timeout = MIN (SELECT_TIMEOUT, src->timeout); - } else { - timeout = SELECT_TIMEOUT; - } + FD_SET (READ_SOCKET (src), &readfds); do { - GST_TIME_TO_TIMEVAL (timeout, t); - if (src->timeout != 0) - timeout -= MIN (timeout, SELECT_TIMEOUT); - - retval = select (src->fd + 1, &readfds, NULL, NULL, tp); - - /* Check whether the element got shutdown before full timeout */ - if (retval == 0) { - if (GST_PAD_IS_FLUSHING (GST_BASE_SRC_PAD (src))) { - GST_DEBUG_OBJECT (src, "Shutting down with no buffer."); - return GST_FLOW_WRONG_STATE; + retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL); + } while ((retval == -1 && errno == EINTR)); + + if (retval == -1) + goto select_error; + + if (FD_ISSET (READ_SOCKET (src), &readfds)) { + /* read all stop commands */ + while (TRUE) { + gchar command; + int res; + + READ_COMMAND (src, command, res); + if (res < 0) { + GST_LOG_OBJECT (src, "no more commands"); + /* no more commands */ + break; } } - } while ((retval == -1 && errno == EINTR) || /* retry if interrupted */ - (retval == 0 && timeout > 0)); /* Retry on incomplete timeout */ - - if (retval == -1) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("select on file descriptor: %s.", g_strerror (errno))); - GST_DEBUG_OBJECT (psrc, "Error during select"); - return GST_FLOW_ERROR; - } else if (retval == 0) { - g_signal_emit (G_OBJECT (src), gst_fdsrc_signals[SIGNAL_TIMEOUT], 0); - GST_DEBUG_OBJECT (psrc, "Timeout in select"); - return GST_FLOW_ERROR; + goto stopped; } #endif + blocksize = GST_BASE_SRC (src)->blocksize; + /* create the buffer */ - buf = gst_buffer_new_and_alloc (src->blocksize); + buf = gst_buffer_new_and_alloc (blocksize); do { - readbytes = read (src->fd, GST_BUFFER_DATA (buf), src->blocksize); + readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize); } while (readbytes == -1 && errno == EINTR); /* retry if interrupted */ - if (readbytes > 0) { - GST_BUFFER_OFFSET (buf) = src->curoffset; - GST_BUFFER_SIZE (buf) = readbytes; - GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; - src->curoffset += readbytes; + if (readbytes < 0) + goto read_error; + + if (readbytes == 0) + goto eos; - GST_DEBUG_OBJECT (psrc, "Read buffer of size %u.", readbytes); + GST_BUFFER_OFFSET (buf) = src->curoffset; + GST_BUFFER_SIZE (buf) = readbytes; + GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE; + src->curoffset += readbytes; - /* we're done, return the buffer */ - *outbuf = buf; - return GST_FLOW_OK; - } else if (readbytes == 0) { + GST_DEBUG_OBJECT (psrc, "Read buffer of size %u.", readbytes); + + /* we're done, return the buffer */ + *outbuf = buf; + + return GST_FLOW_OK; + + /* ERRORS */ +#ifndef HAVE_WIN32 +select_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("select on file descriptor: %s.", g_strerror (errno))); + GST_DEBUG_OBJECT (psrc, "Error during select"); + return GST_FLOW_ERROR; + } +stopped: + { + GST_DEBUG_OBJECT (psrc, "Select stopped"); + return GST_FLOW_WRONG_STATE; + } +#endif +eos: + { GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS."); gst_buffer_unref (buf); - return GST_FLOW_ERROR; - } else { + return GST_FLOW_UNEXPECTED; + } +read_error: + { GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("read on file descriptor: %s.", g_strerror (errno))); GST_DEBUG_OBJECT (psrc, "Error reading from fd"); diff --git a/plugins/elements/gstfdsrc.h b/plugins/elements/gstfdsrc.h index 0004242..fe15195 100644 --- a/plugins/elements/gstfdsrc.h +++ b/plugins/elements/gstfdsrc.h @@ -52,11 +52,9 @@ struct _GstFdSrc { /* fd */ gint fd; + gint control_sock[2]; + gulong curoffset; /* current offset in file */ - gulong blocksize; /* bytes per read */ - guint64 timeout; /* read timeout, in nanoseconds */ - - gulong seq; /* buffer sequence number */ }; struct _GstFdSrcClass { -- 2.7.4