From: Andy Wingo Date: Tue, 27 Sep 2005 16:37:12 +0000 (+0000) Subject: gst/tcp/: Updated for new gsttcp API. X-Git-Tag: 1.19.3~511^2~12706 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=21881814bcd73dc2493cdb5eec97dd846c6fa84a;p=platform%2Fupstream%2Fgstreamer.git gst/tcp/: Updated for new gsttcp API. Original commit message from CVS: 2005-09-27 Andy Wingo * gst/tcp/gsttcpserversrc.c: * gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API. * gst/tcp/gsttcp.h: * gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored out of tcpclientsrc.c. Cancellable. (gst_tcp_socket_read): Made private, cancellable, with better diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*. (gst_tcp_gdp_read_buffer): Made cancellable, actually returns the whole buffer, and better diagnostics. (gst_tcp_gdp_read_caps): Same. * gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time. --- diff --git a/ChangeLog b/ChangeLog index 23567d3..b73400d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,17 @@ 2005-09-27 Andy Wingo + * gst/tcp/gsttcpserversrc.c: + * gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API. + + * gst/tcp/gsttcp.h: + * gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored + out of tcpclientsrc.c. Cancellable. + (gst_tcp_socket_read): Made private, cancellable, with better + diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*. + (gst_tcp_gdp_read_buffer): Made cancellable, actually returns the + whole buffer, and better diagnostics. + (gst_tcp_gdp_read_caps): Same. + * gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time. 2005-09-26 Andy Wingo diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c index 99eee84..58da86c 100644 --- a/gst/tcp/gsttcp.c +++ b/gst/tcp/gsttcp.c @@ -30,6 +30,11 @@ #include #include #include +#include + +#ifdef HAVE_FIONREAD_IN_SYS_FILIO +#include +#endif #include #include @@ -125,27 +130,84 @@ gst_tcp_socket_write (int socket, const void *buf, size_t count) * = 0: EOF * > 0: bytes read */ -gint -gst_tcp_socket_read (int socket, void *buf, size_t count) +static GstFlowReturn +gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count, + int cancel_fd) { - size_t bytes_read = 0; + fd_set testfds; + int maxfdp1; + ssize_t n; + size_t bytes_read; + int num_to_read; + + bytes_read = 0; while (bytes_read < count) { - ssize_t ret = read (socket, buf + bytes_read, - count - bytes_read); - - if (ret < 0) - GST_WARNING ("error while reading: %s", g_strerror (errno)); - if (ret <= 0) - return bytes_read; - bytes_read += ret; + /* do a blocking select on the socket */ + FD_ZERO (&testfds); + FD_SET (socket, &testfds); + if (cancel_fd >= 0) + FD_SET (cancel_fd, &testfds); + maxfdp1 = MAX (socket, cancel_fd) + 1; + + /* no action (0) is an error too in our case */ + if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0) + goto select_error; + + if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds)) + goto cancelled; + + /* ask how much is available for reading on the socket */ + if (ioctl (socket, FIONREAD, &num_to_read) < 0) + goto ioctl_error; + + /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */ + + num_to_read = MIN (num_to_read, count - bytes_read); + + n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read); + + if (n < 0) + goto read_error; + + if (n < num_to_read) + goto short_read; + + bytes_read += num_to_read; } - if (bytes_read < 0) - GST_WARNING ("error while reading: %s", g_strerror (errno)); - else - GST_LOG ("read %d bytes succesfully", bytes_read); - return bytes_read; + return GST_FLOW_OK; + + /* ERRORS */ +select_error: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("select failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +cancelled: + { + GST_DEBUG_OBJECT (this, "Select was cancelled"); + return GST_FLOW_WRONG_STATE; + } +ioctl_error: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("ioctl failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +read_error: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("read failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +short_read: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("short read: wanted %d bytes, got %d", num_to_read, n)); + return GST_FLOW_ERROR; + } } /* close the socket and reset the fd. Used to clean up after errors. */ @@ -162,165 +224,246 @@ gst_tcp_socket_close (int *socket) * - NULL, indicating a connection close or an error, to be handled with * EOS */ -GstBuffer * -gst_tcp_gdp_read_buffer (GstElement * this, int socket) +GstFlowReturn +gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, + GstBuffer ** buf) { - size_t header_length = GST_DP_HEADER_LENGTH; - size_t readsize; - guint8 *header = NULL; - ssize_t ret; - GstBuffer *buffer; + fd_set testfds; + int ret; + int maxfdp1; + ssize_t bytes_read; + int readsize; + + *buf = NULL; + + /* do a blocking select on the socket */ + FD_ZERO (&testfds); + FD_SET (socket, &testfds); + if (cancel_fd >= 0) + FD_SET (cancel_fd, &testfds); + maxfdp1 = MAX (socket, cancel_fd) + 1; + + /* no action (0) is an error too in our case */ + if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0) + goto select_error; + + if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds)) + goto cancelled; - header = g_malloc (header_length); - readsize = header_length; + /* ask how much is available for reading on the socket */ + if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0) + goto ioctl_error; - GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize); - if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0) + /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */ + + *buf = gst_buffer_new_and_alloc (readsize); + + bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize); + + if (bytes_read < 0) goto read_error; - if (ret != readsize) + if (bytes_read < readsize) + /* but mom, you promised to give me readsize bytes! */ goto short_read; - if (!gst_dp_validate_header (header_length, header)) + GST_DEBUG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (buf)); + return GST_FLOW_OK; + + /* ERRORS */ +select_error: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("select failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +cancelled: + { + GST_DEBUG_OBJECT (this, "Select was cancelled"); + return GST_FLOW_WRONG_STATE; + } +ioctl_error: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("ioctl failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +read_error: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("read failed: %s", g_strerror (errno))); + gst_buffer_unref (*buf); + *buf = NULL; + return GST_FLOW_ERROR; + } +short_read: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("short read: wanted %d bytes, got %d", readsize, bytes_read)); + gst_buffer_unref (*buf); + *buf = NULL; + return GST_FLOW_ERROR; + } +} + +/* read a buffer from the given socket + * returns: + * - a GstBuffer in which data should be read + * - NULL, indicating a connection close or an error, to be handled with + * EOS + */ +GstFlowReturn +gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, + GstBuffer ** buf) +{ + GstFlowReturn ret; + guint8 *header = NULL; + + GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", + GST_DP_HEADER_LENGTH); + + *buf = NULL; + header = g_malloc (GST_DP_HEADER_LENGTH); + + ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, + cancel_fd); + + if (ret != GST_FLOW_OK) + goto header_read_error; + + if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) goto validate_error; + if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER) + goto is_not_buffer; + GST_LOG_OBJECT (this, "validated buffer packet header"); - buffer = gst_dp_buffer_from_header (header_length, header); + *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header); + g_free (header); - GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer); + ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf), + GST_BUFFER_SIZE (*buf), cancel_fd); - return buffer; + if (ret != GST_FLOW_OK) + goto data_read_error; + + return GST_FLOW_OK; /* ERRORS */ -read_error: - { - if (ret == 0) { - /* if we read 0 bytes, and we're blocking, we hit eos */ - GST_DEBUG ("blocking read returns 0, returning NULL"); - g_free (header); - return NULL; - } else { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - g_free (header); - return NULL; - } - } -short_read: +header_read_error: { - GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); - g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); - return NULL; + g_free (header); + return ret; } validate_error: { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("GDP buffer packet header does not validate")); g_free (header); - return NULL; + return GST_FLOW_ERROR; + } +is_not_buffer: + { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("GDP packet contains something that is not a buffer")); + g_free (header); + return GST_FLOW_ERROR; + } +data_read_error: + { + gst_buffer_unref (*buf); + *buf = NULL; + return ret; } } -/* read the GDP caps packet from the given socket - * returns the caps, or NULL in case of an error */ -GstCaps * -gst_tcp_gdp_read_caps (GstElement * this, int socket) +GstFlowReturn +gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, + GstCaps ** caps) { - size_t header_length = GST_DP_HEADER_LENGTH; - size_t readsize; + GstFlowReturn ret; guint8 *header = NULL; guint8 *payload = NULL; - ssize_t ret; - GstCaps *caps; - gchar *string; + size_t payload_length; - header = g_malloc (header_length); - readsize = header_length; - - GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize); - if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0) - goto read_error; + GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", + GST_DP_HEADER_LENGTH); - if (ret != readsize) - goto short_read; + *caps = NULL; + header = g_malloc (GST_DP_HEADER_LENGTH); - if (!gst_dp_validate_header (header_length, header)) - goto validate_error; + ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, + cancel_fd); - readsize = gst_dp_header_payload_length (header); - payload = g_malloc (readsize); + if (ret != GST_FLOW_OK) + goto header_read_error; - GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize); - if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0) - goto socket_read_error; + if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) + goto header_validate_error; if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) goto is_not_caps; - g_assert (ret == readsize); + GST_LOG_OBJECT (this, "validated caps packet header"); + + payload_length = gst_dp_header_payload_length (header); + payload = g_malloc (payload_length); + + GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", + payload_length); - if (!gst_dp_validate_payload (readsize, header, payload)) - goto packet_validate_error; + ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd); - caps = gst_dp_caps_from_packet (header_length, header, payload); - string = gst_caps_to_string (caps); - GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string); - g_free (string); + if (ret != GST_FLOW_OK) + goto payload_read_error; + + if (!gst_dp_validate_payload (payload_length, header, payload)) + goto payload_validate_error; + + *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload); + + GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps); g_free (header); g_free (payload); - return caps; + return GST_FLOW_OK; /* ERRORS */ -read_error: +header_read_error: { - if (ret < 0) { - g_free (header); - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - return NULL; - } - if (ret == 0) { - GST_WARNING_OBJECT (this, "read returned EOF"); - return NULL; - } - } -short_read: - { - GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes", - readsize, ret); - return NULL; + g_free (header); + return ret; } -validate_error: +header_validate_error: { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("GDP caps packet header does not validate")); g_free (header); - return NULL; + return GST_FLOW_ERROR; } -socket_read_error: +is_not_caps: { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), + ("GDP packet contains something that is not a caps")); g_free (header); - g_free (payload); - return NULL; + return GST_FLOW_ERROR; } -is_not_caps: +payload_read_error: { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("Header read doesn't describe CAPS payload")); g_free (header); g_free (payload); - return NULL; + return ret; } -packet_validate_error: +payload_validate_error: { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("GDP caps packet payload does not validate")); g_free (header); g_free (payload); - return NULL; + return GST_FLOW_ERROR; } } diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h index 818def8..1c2c1bc 100644 --- a/gst/tcp/gsttcp.h +++ b/gst/tcp/gsttcp.h @@ -42,13 +42,15 @@ typedef enum gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host); gint gst_tcp_socket_write (int socket, const void *buf, size_t count); -gint gst_tcp_socket_read (int socket, void *buf, size_t count); void gst_tcp_socket_close (int *socket); -GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket); -GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket); -GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket); +GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf); + +GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf); +GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps); + +GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd); gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port); gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port); diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c index 7f950ad..c422704 100644 --- a/gst/tcp/gsttcpclientsrc.c +++ b/gst/tcp/gsttcpclientsrc.c @@ -212,9 +212,7 @@ static GstFlowReturn gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPClientSrc *src; - size_t readsize; - int ret; - GstBuffer *buf = NULL; + GstFlowReturn ret; src = GST_TCPCLIENTSRC (psrc); @@ -225,35 +223,13 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) /* read the buffer header if we're using a protocol */ switch (src->protocol) { - fd_set testfds; - case GST_TCP_PROTOCOL_NONE: - /* do a blocking select on the socket */ - FD_ZERO (&testfds); - FD_SET (src->sock_fd, &testfds); - - /* no action (0) is an error too in our case */ - if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0) - goto select_error; - - /* ask how much is available for reading on the socket */ - if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0) - goto ioctl_error; - - GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize); - - buf = gst_buffer_new_and_alloc (readsize); + ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1, outbuf); break; case GST_TCP_PROTOCOL_GDP: - if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd))) - goto hit_eos; - - GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p", - buf); - - /* use this new buffer to read data into */ - readsize = GST_BUFFER_SIZE (buf); + ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1, + outbuf); break; default: /* need to assert as buf == NULL */ @@ -261,67 +237,24 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) break; } - GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize); - if ((ret = - gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), - readsize)) < 0) - goto read_error; - - /* if we read 0 bytes, and we're blocking, we hit eos */ - if (ret == 0) - goto zero_read; - - readsize = ret; - GST_BUFFER_SIZE (buf) = readsize; - - src->curoffset += readsize; - - GST_LOG_OBJECT (src, - "Returning buffer from _get of size %d, ts %" - GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT - ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, - GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), - GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf)); - - gst_buffer_set_caps (buf, src->caps); - - *outbuf = buf; + if (ret == GST_FLOW_OK) { + GST_LOG_OBJECT (src, + "Returning buffer from _get of size %d, ts %" + GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT + ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, + GST_BUFFER_SIZE (*outbuf), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), + GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); + + gst_buffer_set_caps (*outbuf, src->caps); + } - return GST_FLOW_OK; + return ret; - /* ERRORS */ wrong_state: { - GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data"); - return GST_FLOW_WRONG_STATE; - } -select_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -ioctl_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("ioctl failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -hit_eos: - { - return GST_FLOW_WRONG_STATE; - } -read_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - gst_buffer_unref (buf); - return GST_FLOW_ERROR; - } -zero_read: - { - GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS"); - gst_buffer_unref (buf); + GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); return GST_FLOW_WRONG_STATE; } } @@ -330,10 +263,7 @@ static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstTCPClientSrc *tcpclientsrc; - - g_return_if_fail (GST_IS_TCPCLIENTSRC (object)); - tcpclientsrc = GST_TCPCLIENTSRC (object); + GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object); switch (prop_id) { case ARG_HOST: @@ -361,10 +291,7 @@ static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstTCPClientSrc *tcpclientsrc; - - g_return_if_fail (GST_IS_TCPCLIENTSRC (object)); - tcpclientsrc = GST_TCPCLIENTSRC (object); + GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object); switch (prop_id) { case ARG_HOST: @@ -440,18 +367,15 @@ gst_tcpclientsrc_start (GstBaseSrc * bsrc) if (src->protocol == GST_TCP_PROTOCOL_GDP) { /* if we haven't received caps yet, we should get them first */ if (!src->caps_received) { + GstFlowReturn fret; GstCaps *caps; GST_DEBUG_OBJECT (src, "getting caps through GDP"); - if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd))) - goto no_caps; + fret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd, -1, &caps); - if (!GST_IS_CAPS (caps)) + if (fret != GST_FLOW_OK) goto no_caps; - GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT, - caps); - src->caps_received = TRUE; src->caps = caps; } diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c index 1d5aeed..0044447 100644 --- a/gst/tcp/gsttcpserversrc.c +++ b/gst/tcp/gsttcpserversrc.c @@ -186,46 +186,30 @@ static GstFlowReturn gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPServerSrc *src; - size_t readsize; - int ret; - GstBuffer *buf = NULL; - GstCaps *caps; + GstFlowReturn ret; src = GST_TCPSERVERSRC (psrc); - g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), - GST_FLOW_ERROR); + if (!GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN)) + goto wrong_state; + + GST_LOG_OBJECT (src, "asked for a buffer"); - /* read the buffer header if we're using a protocol */ switch (src->protocol) { case GST_TCP_PROTOCOL_NONE: - { - fd_set testfds; - - /* do a blocking select on the socket */ - FD_ZERO (&testfds); - FD_SET (src->client_sock_fd, &testfds); - - /* no action (0) is an error too in our case */ - if ((ret = - select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, - (fd_set *) 0, 0)) <= 0) - goto select_error; - - /* ask how much is available for reading on the socket */ - if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0) - goto ioctl_error; - - buf = gst_buffer_new_and_alloc (readsize); + ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1, + outbuf); break; - } + case GST_TCP_PROTOCOL_GDP: - /* if we haven't received caps yet, we should get them first */ if (!src->caps_received) { + GstCaps *caps; gchar *string; - if (!(caps = - gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd))) + ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd, -1, + &caps); + + if (ret != GST_FLOW_OK) goto gdp_caps_read_error; src->caps_received = TRUE; @@ -236,83 +220,46 @@ gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps); } - /* now receive the buffer header */ - if (!(buf = - gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd))) - goto gdp_buffer_read_error; + ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1, + outbuf); - GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p", - buf); + if (ret == GST_FLOW_OK) + gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src))); - /* use this new buffer to read data into */ - readsize = GST_BUFFER_SIZE (buf); break; + default: - g_warning ("Unhandled protocol type"); + /* need to assert as buf == NULL */ + g_assert ("Unhandled protocol type"); break; } - GST_LOG_OBJECT (src, "Reading %d bytes", readsize); - if ((ret = - gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf), - readsize)) < 0) - goto read_error; - - /* if we read 0 bytes, and we're blocking, we hit eos */ - if (ret == 0) - goto hit_eos; - - readsize = ret; - GST_LOG_OBJECT (src, "Read %d bytes", readsize); - GST_BUFFER_SIZE (buf) = readsize; - GST_BUFFER_OFFSET (buf) = src->curoffset; - GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize; - src->curoffset += readsize; - - *outbuf = buf; + if (ret == GST_FLOW_OK) { + GST_LOG_OBJECT (src, + "Returning buffer from _get of size %d, ts %" + GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT + ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, + GST_BUFFER_SIZE (*outbuf), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), + GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); + } - return GST_FLOW_OK; + return ret; - /* ERROR */ -select_error: +wrong_state: { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -ioctl_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("ioctl failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; + GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); + return GST_FLOW_WRONG_STATE; } gdp_caps_read_error: { GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Could not read caps through GDP")); - return GST_FLOW_ERROR; - } -gdp_buffer_read_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not read buffer header through GDP")); - return GST_FLOW_ERROR; - } -read_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - gst_buffer_unref (buf); - return GST_FLOW_ERROR; - } -hit_eos: - { - GST_DEBUG ("blocking read returns 0, EOS"); - gst_buffer_unref (buf); - return GST_FLOW_WRONG_STATE; + return ret; } } - static void gst_tcpserversrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec)