2005-09-27 Andy Wingo <wingo@pobox.com>
+ * gst/tcp/gsttcpserversrc.c:
+ * gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API.
+
+ * gst/tcp/gsttcp.h:
+ * gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored
+ out of tcpclientsrc.c. Cancellable.
+ (gst_tcp_socket_read): Made private, cancellable, with better
+ diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*.
+ (gst_tcp_gdp_read_buffer): Made cancellable, actually returns the
+ whole buffer, and better diagnostics.
+ (gst_tcp_gdp_read_caps): Same.
+
* gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time.
2005-09-26 Andy Wingo <wingo@pobox.com>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_FIONREAD_IN_SYS_FILIO
+#include <sys/filio.h>
+#endif
#include <glib.h>
#include <gst/gst.h>
* = 0: EOF
* > 0: bytes read
*/
-gint
-gst_tcp_socket_read (int socket, void *buf, size_t count)
+static GstFlowReturn
+gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
+ int cancel_fd)
{
- size_t bytes_read = 0;
+ fd_set testfds;
+ int maxfdp1;
+ ssize_t n;
+ size_t bytes_read;
+ int num_to_read;
+
+ bytes_read = 0;
while (bytes_read < count) {
- ssize_t ret = read (socket, buf + bytes_read,
- count - bytes_read);
-
- if (ret < 0)
- GST_WARNING ("error while reading: %s", g_strerror (errno));
- if (ret <= 0)
- return bytes_read;
- bytes_read += ret;
+ /* do a blocking select on the socket */
+ FD_ZERO (&testfds);
+ FD_SET (socket, &testfds);
+ if (cancel_fd >= 0)
+ FD_SET (cancel_fd, &testfds);
+ maxfdp1 = MAX (socket, cancel_fd) + 1;
+
+ /* no action (0) is an error too in our case */
+ if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
+ goto select_error;
+
+ if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
+ goto cancelled;
+
+ /* ask how much is available for reading on the socket */
+ if (ioctl (socket, FIONREAD, &num_to_read) < 0)
+ goto ioctl_error;
+
+ /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
+
+ num_to_read = MIN (num_to_read, count - bytes_read);
+
+ n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
+
+ if (n < 0)
+ goto read_error;
+
+ if (n < num_to_read)
+ goto short_read;
+
+ bytes_read += num_to_read;
}
- if (bytes_read < 0)
- GST_WARNING ("error while reading: %s", g_strerror (errno));
- else
- GST_LOG ("read %d bytes succesfully", bytes_read);
- return bytes_read;
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+select_error:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("select failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+cancelled:
+ {
+ GST_DEBUG_OBJECT (this, "Select was cancelled");
+ return GST_FLOW_WRONG_STATE;
+ }
+ioctl_error:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("ioctl failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+read_error:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("read failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+short_read:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("short read: wanted %d bytes, got %d", num_to_read, n));
+ return GST_FLOW_ERROR;
+ }
}
/* close the socket and reset the fd. Used to clean up after errors. */
* - NULL, indicating a connection close or an error, to be handled with
* EOS
*/
-GstBuffer *
-gst_tcp_gdp_read_buffer (GstElement * this, int socket)
+GstFlowReturn
+gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
+ GstBuffer ** buf)
{
- size_t header_length = GST_DP_HEADER_LENGTH;
- size_t readsize;
- guint8 *header = NULL;
- ssize_t ret;
- GstBuffer *buffer;
+ fd_set testfds;
+ int ret;
+ int maxfdp1;
+ ssize_t bytes_read;
+ int readsize;
+
+ *buf = NULL;
+
+ /* do a blocking select on the socket */
+ FD_ZERO (&testfds);
+ FD_SET (socket, &testfds);
+ if (cancel_fd >= 0)
+ FD_SET (cancel_fd, &testfds);
+ maxfdp1 = MAX (socket, cancel_fd) + 1;
+
+ /* no action (0) is an error too in our case */
+ if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
+ goto select_error;
+
+ if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
+ goto cancelled;
- header = g_malloc (header_length);
- readsize = header_length;
+ /* ask how much is available for reading on the socket */
+ if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
+ goto ioctl_error;
- GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
- if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
+ /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
+
+ *buf = gst_buffer_new_and_alloc (readsize);
+
+ bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize);
+
+ if (bytes_read < 0)
goto read_error;
- if (ret != readsize)
+ if (bytes_read < readsize)
+ /* but mom, you promised to give me readsize bytes! */
goto short_read;
- if (!gst_dp_validate_header (header_length, header))
+ GST_DEBUG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (buf));
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+select_error:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("select failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+cancelled:
+ {
+ GST_DEBUG_OBJECT (this, "Select was cancelled");
+ return GST_FLOW_WRONG_STATE;
+ }
+ioctl_error:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("ioctl failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+read_error:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("read failed: %s", g_strerror (errno)));
+ gst_buffer_unref (*buf);
+ *buf = NULL;
+ return GST_FLOW_ERROR;
+ }
+short_read:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("short read: wanted %d bytes, got %d", readsize, bytes_read));
+ gst_buffer_unref (*buf);
+ *buf = NULL;
+ return GST_FLOW_ERROR;
+ }
+}
+
+/* read a buffer from the given socket
+ * returns:
+ * - a GstBuffer in which data should be read
+ * - NULL, indicating a connection close or an error, to be handled with
+ * EOS
+ */
+GstFlowReturn
+gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
+ GstBuffer ** buf)
+{
+ GstFlowReturn ret;
+ guint8 *header = NULL;
+
+ GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
+ GST_DP_HEADER_LENGTH);
+
+ *buf = NULL;
+ header = g_malloc (GST_DP_HEADER_LENGTH);
+
+ ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
+ cancel_fd);
+
+ if (ret != GST_FLOW_OK)
+ goto header_read_error;
+
+ if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
goto validate_error;
+ if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
+ goto is_not_buffer;
+
GST_LOG_OBJECT (this, "validated buffer packet header");
- buffer = gst_dp_buffer_from_header (header_length, header);
+ *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
+
g_free (header);
- GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
+ ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
+ GST_BUFFER_SIZE (*buf), cancel_fd);
- return buffer;
+ if (ret != GST_FLOW_OK)
+ goto data_read_error;
+
+ return GST_FLOW_OK;
/* ERRORS */
-read_error:
- {
- if (ret == 0) {
- /* if we read 0 bytes, and we're blocking, we hit eos */
- GST_DEBUG ("blocking read returns 0, returning NULL");
- g_free (header);
- return NULL;
- } else {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- g_free (header);
- return NULL;
- }
- }
-short_read:
+header_read_error:
{
- GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
- g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
- return NULL;
+ g_free (header);
+ return ret;
}
validate_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
- return NULL;
+ return GST_FLOW_ERROR;
+ }
+is_not_buffer:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("GDP packet contains something that is not a buffer"));
+ g_free (header);
+ return GST_FLOW_ERROR;
+ }
+data_read_error:
+ {
+ gst_buffer_unref (*buf);
+ *buf = NULL;
+ return ret;
}
}
-/* read the GDP caps packet from the given socket
- * returns the caps, or NULL in case of an error */
-GstCaps *
-gst_tcp_gdp_read_caps (GstElement * this, int socket)
+GstFlowReturn
+gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
+ GstCaps ** caps)
{
- size_t header_length = GST_DP_HEADER_LENGTH;
- size_t readsize;
+ GstFlowReturn ret;
guint8 *header = NULL;
guint8 *payload = NULL;
- ssize_t ret;
- GstCaps *caps;
- gchar *string;
+ size_t payload_length;
- header = g_malloc (header_length);
- readsize = header_length;
-
- GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
- if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
- goto read_error;
+ GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
+ GST_DP_HEADER_LENGTH);
- if (ret != readsize)
- goto short_read;
+ *caps = NULL;
+ header = g_malloc (GST_DP_HEADER_LENGTH);
- if (!gst_dp_validate_header (header_length, header))
- goto validate_error;
+ ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
+ cancel_fd);
- readsize = gst_dp_header_payload_length (header);
- payload = g_malloc (readsize);
+ if (ret != GST_FLOW_OK)
+ goto header_read_error;
- GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
- if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0)
- goto socket_read_error;
+ if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
+ goto header_validate_error;
if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
goto is_not_caps;
- g_assert (ret == readsize);
+ GST_LOG_OBJECT (this, "validated caps packet header");
+
+ payload_length = gst_dp_header_payload_length (header);
+ payload = g_malloc (payload_length);
+
+ GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload",
+ payload_length);
- if (!gst_dp_validate_payload (readsize, header, payload))
- goto packet_validate_error;
+ ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
- caps = gst_dp_caps_from_packet (header_length, header, payload);
- string = gst_caps_to_string (caps);
- GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
- g_free (string);
+ if (ret != GST_FLOW_OK)
+ goto payload_read_error;
+
+ if (!gst_dp_validate_payload (payload_length, header, payload))
+ goto payload_validate_error;
+
+ *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
+
+ GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
g_free (header);
g_free (payload);
- return caps;
+ return GST_FLOW_OK;
/* ERRORS */
-read_error:
+header_read_error:
{
- if (ret < 0) {
- g_free (header);
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- return NULL;
- }
- if (ret == 0) {
- GST_WARNING_OBJECT (this, "read returned EOF");
- return NULL;
- }
- }
-short_read:
- {
- GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
- readsize, ret);
- return NULL;
+ g_free (header);
+ return ret;
}
-validate_error:
+header_validate_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
- return NULL;
+ return GST_FLOW_ERROR;
}
-socket_read_error:
+is_not_caps:
{
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+ ("GDP packet contains something that is not a caps"));
g_free (header);
- g_free (payload);
- return NULL;
+ return GST_FLOW_ERROR;
}
-is_not_caps:
+payload_read_error:
{
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("Header read doesn't describe CAPS payload"));
g_free (header);
g_free (payload);
- return NULL;
+ return ret;
}
-packet_validate_error:
+payload_validate_error:
{
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
- return NULL;
+ return GST_FLOW_ERROR;
}
}
gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
-gint gst_tcp_socket_read (int socket, void *buf, size_t count);
void gst_tcp_socket_close (int *socket);
-GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket);
-GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket);
-GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket);
+GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
+
+GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
+GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps);
+
+GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd);
gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPClientSrc *src;
- size_t readsize;
- int ret;
- GstBuffer *buf = NULL;
+ GstFlowReturn ret;
src = GST_TCPCLIENTSRC (psrc);
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
- fd_set testfds;
-
case GST_TCP_PROTOCOL_NONE:
- /* do a blocking select on the socket */
- FD_ZERO (&testfds);
- FD_SET (src->sock_fd, &testfds);
-
- /* no action (0) is an error too in our case */
- if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0)
- goto select_error;
-
- /* ask how much is available for reading on the socket */
- if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0)
- goto ioctl_error;
-
- GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
-
- buf = gst_buffer_new_and_alloc (readsize);
+ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1, outbuf);
break;
case GST_TCP_PROTOCOL_GDP:
- if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd)))
- goto hit_eos;
-
- GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
- buf);
-
- /* use this new buffer to read data into */
- readsize = GST_BUFFER_SIZE (buf);
+ ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1,
+ outbuf);
break;
default:
/* need to assert as buf == NULL */
break;
}
- GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
- if ((ret =
- gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf),
- readsize)) < 0)
- goto read_error;
-
- /* if we read 0 bytes, and we're blocking, we hit eos */
- if (ret == 0)
- goto zero_read;
-
- readsize = ret;
- GST_BUFFER_SIZE (buf) = readsize;
-
- src->curoffset += readsize;
-
- GST_LOG_OBJECT (src,
- "Returning buffer from _get of size %d, ts %"
- GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
- ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
- GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
- GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
-
- gst_buffer_set_caps (buf, src->caps);
-
- *outbuf = buf;
+ if (ret == GST_FLOW_OK) {
+ GST_LOG_OBJECT (src,
+ "Returning buffer from _get of size %d, ts %"
+ GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+ ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+ GST_BUFFER_SIZE (*outbuf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
+ GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+
+ gst_buffer_set_caps (*outbuf, src->caps);
+ }
- return GST_FLOW_OK;
+ return ret;
- /* ERRORS */
wrong_state:
{
- GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
- return GST_FLOW_WRONG_STATE;
- }
-select_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("select failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
- }
-ioctl_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("ioctl failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
- }
-hit_eos:
- {
- return GST_FLOW_WRONG_STATE;
- }
-read_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
- }
-zero_read:
- {
- GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
- gst_buffer_unref (buf);
+ GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
return GST_FLOW_WRONG_STATE;
}
}
gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
- GstTCPClientSrc *tcpclientsrc;
-
- g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
- tcpclientsrc = GST_TCPCLIENTSRC (object);
+ GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object);
switch (prop_id) {
case ARG_HOST:
gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
- GstTCPClientSrc *tcpclientsrc;
-
- g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
- tcpclientsrc = GST_TCPCLIENTSRC (object);
+ GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object);
switch (prop_id) {
case ARG_HOST:
if (src->protocol == GST_TCP_PROTOCOL_GDP) {
/* if we haven't received caps yet, we should get them first */
if (!src->caps_received) {
+ GstFlowReturn fret;
GstCaps *caps;
GST_DEBUG_OBJECT (src, "getting caps through GDP");
- if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd)))
- goto no_caps;
+ fret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd, -1, &caps);
- if (!GST_IS_CAPS (caps))
+ if (fret != GST_FLOW_OK)
goto no_caps;
- GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT,
- caps);
-
src->caps_received = TRUE;
src->caps = caps;
}
gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPServerSrc *src;
- size_t readsize;
- int ret;
- GstBuffer *buf = NULL;
- GstCaps *caps;
+ GstFlowReturn ret;
src = GST_TCPSERVERSRC (psrc);
- g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN),
- GST_FLOW_ERROR);
+ if (!GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN))
+ goto wrong_state;
+
+ GST_LOG_OBJECT (src, "asked for a buffer");
- /* read the buffer header if we're using a protocol */
switch (src->protocol) {
case GST_TCP_PROTOCOL_NONE:
- {
- fd_set testfds;
-
- /* do a blocking select on the socket */
- FD_ZERO (&testfds);
- FD_SET (src->client_sock_fd, &testfds);
-
- /* no action (0) is an error too in our case */
- if ((ret =
- select (src->client_sock_fd + 1, &testfds, (fd_set *) 0,
- (fd_set *) 0, 0)) <= 0)
- goto select_error;
-
- /* ask how much is available for reading on the socket */
- if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0)
- goto ioctl_error;
-
- buf = gst_buffer_new_and_alloc (readsize);
+ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1,
+ outbuf);
break;
- }
+
case GST_TCP_PROTOCOL_GDP:
- /* if we haven't received caps yet, we should get them first */
if (!src->caps_received) {
+ GstCaps *caps;
gchar *string;
- if (!(caps =
- gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd)))
+ ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd, -1,
+ &caps);
+
+ if (ret != GST_FLOW_OK)
goto gdp_caps_read_error;
src->caps_received = TRUE;
gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
}
- /* now receive the buffer header */
- if (!(buf =
- gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd)))
- goto gdp_buffer_read_error;
+ ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1,
+ outbuf);
- GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
- buf);
+ if (ret == GST_FLOW_OK)
+ gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
- /* use this new buffer to read data into */
- readsize = GST_BUFFER_SIZE (buf);
break;
+
default:
- g_warning ("Unhandled protocol type");
+ /* need to assert as buf == NULL */
+ g_assert ("Unhandled protocol type");
break;
}
- GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
- if ((ret =
- gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
- readsize)) < 0)
- goto read_error;
-
- /* if we read 0 bytes, and we're blocking, we hit eos */
- if (ret == 0)
- goto hit_eos;
-
- readsize = ret;
- GST_LOG_OBJECT (src, "Read %d bytes", readsize);
- GST_BUFFER_SIZE (buf) = readsize;
- GST_BUFFER_OFFSET (buf) = src->curoffset;
- GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
- src->curoffset += readsize;
-
- *outbuf = buf;
+ if (ret == GST_FLOW_OK) {
+ GST_LOG_OBJECT (src,
+ "Returning buffer from _get of size %d, ts %"
+ GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+ ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+ GST_BUFFER_SIZE (*outbuf),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
+ GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+ }
- return GST_FLOW_OK;
+ return ret;
- /* ERROR */
-select_error:
+wrong_state:
{
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("select failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
- }
-ioctl_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("ioctl failed: %s", g_strerror (errno)));
- return GST_FLOW_ERROR;
+ GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
+ return GST_FLOW_WRONG_STATE;
}
gdp_caps_read_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not read caps through GDP"));
- return GST_FLOW_ERROR;
- }
-gdp_buffer_read_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Could not read buffer header through GDP"));
- return GST_FLOW_ERROR;
- }
-read_error:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
- }
-hit_eos:
- {
- GST_DEBUG ("blocking read returns 0, EOS");
- gst_buffer_unref (buf);
- return GST_FLOW_WRONG_STATE;
+ return ret;
}
}
-
static void
gst_tcpserversrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)