gst/tcp/: Updated for new gsttcp API.
authorAndy Wingo <wingo@pobox.com>
Tue, 27 Sep 2005 16:37:12 +0000 (16:37 +0000)
committerAndy Wingo <wingo@pobox.com>
Tue, 27 Sep 2005 16:37:12 +0000 (16:37 +0000)
Original commit message from CVS:
2005-09-27  Andy Wingo  <wingo@pobox.com>

* gst/tcp/gsttcpserversrc.c:
* gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API.

* gst/tcp/gsttcp.h:
* gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored
out of tcpclientsrc.c. Cancellable.
(gst_tcp_socket_read): Made private, cancellable, with better
diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*.
(gst_tcp_gdp_read_buffer): Made cancellable, actually returns the
whole buffer, and better diagnostics.
(gst_tcp_gdp_read_caps): Same.

* gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time.

ChangeLog
gst/tcp/gsttcp.c
gst/tcp/gsttcp.h
gst/tcp/gsttcpclientsrc.c
gst/tcp/gsttcpserversrc.c

index 23567d3..b73400d 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,17 @@
 2005-09-27  Andy Wingo  <wingo@pobox.com>
 
+       * gst/tcp/gsttcpserversrc.c: 
+       * gst/tcp/gsttcpclientsrc.c: Updated for new gsttcp API.
+
+       * gst/tcp/gsttcp.h: 
+       * gst/tcp/gsttcp.c (gst_tcp_read_buffer): New function, factored
+       out of tcpclientsrc.c. Cancellable.
+       (gst_tcp_socket_read): Made private, cancellable, with better
+       diagnostics. Also the FIONREAD ioctl takes a int*, not a size_t*.
+       (gst_tcp_gdp_read_buffer): Made cancellable, actually returns the
+       whole buffer, and better diagnostics.
+       (gst_tcp_gdp_read_caps): Same.
+
        * gst/sine/gstsinesrc.c (gst_sinesrc_wait): Add the base time.
 
 2005-09-26  Andy Wingo  <wingo@pobox.com>
index 99eee84..58da86c 100644 (file)
 #include <arpa/inet.h>
 #include <netdb.h>
 #include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_FIONREAD_IN_SYS_FILIO
+#include <sys/filio.h>
+#endif
 
 #include <glib.h>
 #include <gst/gst.h>
@@ -125,27 +130,84 @@ gst_tcp_socket_write (int socket, const void *buf, size_t count)
  * = 0: EOF
  * > 0: bytes read
  */
-gint
-gst_tcp_socket_read (int socket, void *buf, size_t count)
+static GstFlowReturn
+gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
+    int cancel_fd)
 {
-  size_t bytes_read = 0;
+  fd_set testfds;
+  int maxfdp1;
+  ssize_t n;
+  size_t bytes_read;
+  int num_to_read;
+
+  bytes_read = 0;
 
   while (bytes_read < count) {
-    ssize_t ret = read (socket, buf + bytes_read,
-        count - bytes_read);
-
-    if (ret < 0)
-      GST_WARNING ("error while reading: %s", g_strerror (errno));
-    if (ret <= 0)
-      return bytes_read;
-    bytes_read += ret;
+    /* do a blocking select on the socket */
+    FD_ZERO (&testfds);
+    FD_SET (socket, &testfds);
+    if (cancel_fd >= 0)
+      FD_SET (cancel_fd, &testfds);
+    maxfdp1 = MAX (socket, cancel_fd) + 1;
+
+    /* no action (0) is an error too in our case */
+    if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
+      goto select_error;
+
+    if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
+      goto cancelled;
+
+    /* ask how much is available for reading on the socket */
+    if (ioctl (socket, FIONREAD, &num_to_read) < 0)
+      goto ioctl_error;
+
+    /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */
+
+    num_to_read = MIN (num_to_read, count - bytes_read);
+
+    n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read);
+
+    if (n < 0)
+      goto read_error;
+
+    if (n < num_to_read)
+      goto short_read;
+
+    bytes_read += num_to_read;
   }
 
-  if (bytes_read < 0)
-    GST_WARNING ("error while reading: %s", g_strerror (errno));
-  else
-    GST_LOG ("read %d bytes succesfully", bytes_read);
-  return bytes_read;
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+select_error:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("select failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+cancelled:
+  {
+    GST_DEBUG_OBJECT (this, "Select was cancelled");
+    return GST_FLOW_WRONG_STATE;
+  }
+ioctl_error:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("ioctl failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+read_error:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("read failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+short_read:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("short read: wanted %d bytes, got %d", num_to_read, n));
+    return GST_FLOW_ERROR;
+  }
 }
 
 /* close the socket and reset the fd.  Used to clean up after errors. */
@@ -162,165 +224,246 @@ gst_tcp_socket_close (int *socket)
  * - NULL, indicating a connection close or an error, to be handled with
  *         EOS
  */
-GstBuffer *
-gst_tcp_gdp_read_buffer (GstElement * this, int socket)
+GstFlowReturn
+gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
+    GstBuffer ** buf)
 {
-  size_t header_length = GST_DP_HEADER_LENGTH;
-  size_t readsize;
-  guint8 *header = NULL;
-  ssize_t ret;
-  GstBuffer *buffer;
+  fd_set testfds;
+  int ret;
+  int maxfdp1;
+  ssize_t bytes_read;
+  int readsize;
+
+  *buf = NULL;
+
+  /* do a blocking select on the socket */
+  FD_ZERO (&testfds);
+  FD_SET (socket, &testfds);
+  if (cancel_fd >= 0)
+    FD_SET (cancel_fd, &testfds);
+  maxfdp1 = MAX (socket, cancel_fd) + 1;
+
+  /* no action (0) is an error too in our case */
+  if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
+    goto select_error;
+
+  if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
+    goto cancelled;
 
-  header = g_malloc (header_length);
-  readsize = header_length;
+  /* ask how much is available for reading on the socket */
+  if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
+    goto ioctl_error;
 
-  GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
-  if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
+  /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */
+
+  *buf = gst_buffer_new_and_alloc (readsize);
+
+  bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize);
+
+  if (bytes_read < 0)
     goto read_error;
 
-  if (ret != readsize)
+  if (bytes_read < readsize)
+    /* but mom, you promised to give me readsize bytes! */
     goto short_read;
 
-  if (!gst_dp_validate_header (header_length, header))
+  GST_DEBUG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (buf));
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+select_error:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("select failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+cancelled:
+  {
+    GST_DEBUG_OBJECT (this, "Select was cancelled");
+    return GST_FLOW_WRONG_STATE;
+  }
+ioctl_error:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("ioctl failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+read_error:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("read failed: %s", g_strerror (errno)));
+    gst_buffer_unref (*buf);
+    *buf = NULL;
+    return GST_FLOW_ERROR;
+  }
+short_read:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("short read: wanted %d bytes, got %d", readsize, bytes_read));
+    gst_buffer_unref (*buf);
+    *buf = NULL;
+    return GST_FLOW_ERROR;
+  }
+}
+
+/* read a buffer from the given socket
+ * returns:
+ * - a GstBuffer in which data should be read
+ * - NULL, indicating a connection close or an error, to be handled with
+ *         EOS
+ */
+GstFlowReturn
+gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
+    GstBuffer ** buf)
+{
+  GstFlowReturn ret;
+  guint8 *header = NULL;
+
+  GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header",
+      GST_DP_HEADER_LENGTH);
+
+  *buf = NULL;
+  header = g_malloc (GST_DP_HEADER_LENGTH);
+
+  ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
+      cancel_fd);
+
+  if (ret != GST_FLOW_OK)
+    goto header_read_error;
+
+  if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
     goto validate_error;
 
+  if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER)
+    goto is_not_buffer;
+
   GST_LOG_OBJECT (this, "validated buffer packet header");
 
-  buffer = gst_dp_buffer_from_header (header_length, header);
+  *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header);
+
   g_free (header);
 
-  GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
+  ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
+      GST_BUFFER_SIZE (*buf), cancel_fd);
 
-  return buffer;
+  if (ret != GST_FLOW_OK)
+    goto data_read_error;
+
+  return GST_FLOW_OK;
 
   /* ERRORS */
-read_error:
-  {
-    if (ret == 0) {
-      /* if we read 0 bytes, and we're blocking, we hit eos */
-      GST_DEBUG ("blocking read returns 0, returning NULL");
-      g_free (header);
-      return NULL;
-    } else {
-      GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-      g_free (header);
-      return NULL;
-    }
-  }
-short_read:
+header_read_error:
   {
-    GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
-    g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
-    return NULL;
+    g_free (header);
+    return ret;
   }
 validate_error:
   {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("GDP buffer packet header does not validate"));
     g_free (header);
-    return NULL;
+    return GST_FLOW_ERROR;
+  }
+is_not_buffer:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP packet contains something that is not a buffer"));
+    g_free (header);
+    return GST_FLOW_ERROR;
+  }
+data_read_error:
+  {
+    gst_buffer_unref (*buf);
+    *buf = NULL;
+    return ret;
   }
 }
 
-/* read the GDP caps packet from the given socket
- * returns the caps, or NULL in case of an error */
-GstCaps *
-gst_tcp_gdp_read_caps (GstElement * this, int socket)
+GstFlowReturn
+gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
+    GstCaps ** caps)
 {
-  size_t header_length = GST_DP_HEADER_LENGTH;
-  size_t readsize;
+  GstFlowReturn ret;
   guint8 *header = NULL;
   guint8 *payload = NULL;
-  ssize_t ret;
-  GstCaps *caps;
-  gchar *string;
+  size_t payload_length;
 
-  header = g_malloc (header_length);
-  readsize = header_length;
-
-  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
-  if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
-    goto read_error;
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header",
+      GST_DP_HEADER_LENGTH);
 
-  if (ret != readsize)
-    goto short_read;
+  *caps = NULL;
+  header = g_malloc (GST_DP_HEADER_LENGTH);
 
-  if (!gst_dp_validate_header (header_length, header))
-    goto validate_error;
+  ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
+      cancel_fd);
 
-  readsize = gst_dp_header_payload_length (header);
-  payload = g_malloc (readsize);
+  if (ret != GST_FLOW_OK)
+    goto header_read_error;
 
-  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
-  if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0)
-    goto socket_read_error;
+  if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
+    goto header_validate_error;
 
   if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
     goto is_not_caps;
 
-  g_assert (ret == readsize);
+  GST_LOG_OBJECT (this, "validated caps packet header");
+
+  payload_length = gst_dp_header_payload_length (header);
+  payload = g_malloc (payload_length);
+
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload",
+      payload_length);
 
-  if (!gst_dp_validate_payload (readsize, header, payload))
-    goto packet_validate_error;
+  ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
 
-  caps = gst_dp_caps_from_packet (header_length, header, payload);
-  string = gst_caps_to_string (caps);
-  GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
-  g_free (string);
+  if (ret != GST_FLOW_OK)
+    goto payload_read_error;
+
+  if (!gst_dp_validate_payload (payload_length, header, payload))
+    goto payload_validate_error;
+
+  *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload);
+
+  GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps);
 
   g_free (header);
   g_free (payload);
 
-  return caps;
+  return GST_FLOW_OK;
 
   /* ERRORS */
-read_error:
+header_read_error:
   {
-    if (ret < 0) {
-      g_free (header);
-      GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-      return NULL;
-    }
-    if (ret == 0) {
-      GST_WARNING_OBJECT (this, "read returned EOF");
-      return NULL;
-    }
-  }
-short_read:
-  {
-    GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
-        readsize, ret);
-    return NULL;
+    g_free (header);
+    return ret;
   }
-validate_error:
+header_validate_error:
   {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("GDP caps packet header does not validate"));
     g_free (header);
-    return NULL;
+    return GST_FLOW_ERROR;
   }
-socket_read_error:
+is_not_caps:
   {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP packet contains something that is not a caps"));
     g_free (header);
-    g_free (payload);
-    return NULL;
+    return GST_FLOW_ERROR;
   }
-is_not_caps:
+payload_read_error:
   {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
-        ("Header read doesn't describe CAPS payload"));
     g_free (header);
     g_free (payload);
-    return NULL;
+    return ret;
   }
-packet_validate_error:
+payload_validate_error:
   {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("GDP caps packet payload does not validate"));
     g_free (header);
     g_free (payload);
-    return NULL;
+    return GST_FLOW_ERROR;
   }
 }
 
index 818def8..1c2c1bc 100644 (file)
@@ -42,13 +42,15 @@ typedef enum
 gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
 
 gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
-gint gst_tcp_socket_read (int socket, void *buf, size_t count);
 
 void gst_tcp_socket_close (int *socket);
 
-GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket);
-GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket);
-GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket);
+GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
+
+GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
+GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps);
+
+GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd);
 
 gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
 gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
index 7f950ad..c422704 100644 (file)
@@ -212,9 +212,7 @@ static GstFlowReturn
 gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPClientSrc *src;
-  size_t readsize;
-  int ret;
-  GstBuffer *buf = NULL;
+  GstFlowReturn ret;
 
   src = GST_TCPCLIENTSRC (psrc);
 
@@ -225,35 +223,13 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 
   /* read the buffer header if we're using a protocol */
   switch (src->protocol) {
-      fd_set testfds;
-
     case GST_TCP_PROTOCOL_NONE:
-      /* do a blocking select on the socket */
-      FD_ZERO (&testfds);
-      FD_SET (src->sock_fd, &testfds);
-
-      /* no action (0) is an error too in our case */
-      if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0)
-        goto select_error;
-
-      /* ask how much is available for reading on the socket */
-      if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0)
-        goto ioctl_error;
-
-      GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
-
-      buf = gst_buffer_new_and_alloc (readsize);
+      ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1, outbuf);
       break;
 
     case GST_TCP_PROTOCOL_GDP:
-      if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd)))
-        goto hit_eos;
-
-      GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
-          buf);
-
-      /* use this new buffer to read data into */
-      readsize = GST_BUFFER_SIZE (buf);
+      ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd, -1,
+          outbuf);
       break;
     default:
       /* need to assert as buf == NULL */
@@ -261,67 +237,24 @@ gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
       break;
   }
 
-  GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
-  if ((ret =
-          gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf),
-              readsize)) < 0)
-    goto read_error;
-
-  /* if we read 0 bytes, and we're blocking, we hit eos */
-  if (ret == 0)
-    goto zero_read;
-
-  readsize = ret;
-  GST_BUFFER_SIZE (buf) = readsize;
-
-  src->curoffset += readsize;
-
-  GST_LOG_OBJECT (src,
-      "Returning buffer from _get of size %d, ts %"
-      GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
-      ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
-      GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
-      GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
-
-  gst_buffer_set_caps (buf, src->caps);
-
-  *outbuf = buf;
+  if (ret == GST_FLOW_OK) {
+    GST_LOG_OBJECT (src,
+        "Returning buffer from _get of size %d, ts %"
+        GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+        ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+        GST_BUFFER_SIZE (*outbuf),
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
+        GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
+        GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+
+    gst_buffer_set_caps (*outbuf, src->caps);
+  }
 
-  return GST_FLOW_OK;
+  return ret;
 
-  /* ERRORS */
 wrong_state:
   {
-    GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
-    return GST_FLOW_WRONG_STATE;
-  }
-select_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("select failed: %s", g_strerror (errno)));
-    return GST_FLOW_ERROR;
-  }
-ioctl_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("ioctl failed: %s", g_strerror (errno)));
-    return GST_FLOW_ERROR;
-  }
-hit_eos:
-  {
-    return GST_FLOW_WRONG_STATE;
-  }
-read_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
-  }
-zero_read:
-  {
-    GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
-    gst_buffer_unref (buf);
+    GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
     return GST_FLOW_WRONG_STATE;
   }
 }
@@ -330,10 +263,7 @@ static void
 gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
-  GstTCPClientSrc *tcpclientsrc;
-
-  g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
-  tcpclientsrc = GST_TCPCLIENTSRC (object);
+  GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object);
 
   switch (prop_id) {
     case ARG_HOST:
@@ -361,10 +291,7 @@ static void
 gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
     GParamSpec * pspec)
 {
-  GstTCPClientSrc *tcpclientsrc;
-
-  g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
-  tcpclientsrc = GST_TCPCLIENTSRC (object);
+  GstTCPClientSrc *tcpclientsrc = GST_TCPCLIENTSRC (object);
 
   switch (prop_id) {
     case ARG_HOST:
@@ -440,18 +367,15 @@ gst_tcpclientsrc_start (GstBaseSrc * bsrc)
   if (src->protocol == GST_TCP_PROTOCOL_GDP) {
     /* if we haven't received caps yet, we should get them first */
     if (!src->caps_received) {
+      GstFlowReturn fret;
       GstCaps *caps;
 
       GST_DEBUG_OBJECT (src, "getting caps through GDP");
-      if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd)))
-        goto no_caps;
+      fret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd, -1, &caps);
 
-      if (!GST_IS_CAPS (caps))
+      if (fret != GST_FLOW_OK)
         goto no_caps;
 
-      GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT,
-          caps);
-
       src->caps_received = TRUE;
       src->caps = caps;
     }
index 1d5aeed..0044447 100644 (file)
@@ -186,46 +186,30 @@ static GstFlowReturn
 gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPServerSrc *src;
-  size_t readsize;
-  int ret;
-  GstBuffer *buf = NULL;
-  GstCaps *caps;
+  GstFlowReturn ret;
 
   src = GST_TCPSERVERSRC (psrc);
 
-  g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN),
-      GST_FLOW_ERROR);
+  if (!GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN))
+    goto wrong_state;
+
+  GST_LOG_OBJECT (src, "asked for a buffer");
 
-  /* read the buffer header if we're using a protocol */
   switch (src->protocol) {
     case GST_TCP_PROTOCOL_NONE:
-    {
-      fd_set testfds;
-
-      /* do a blocking select on the socket */
-      FD_ZERO (&testfds);
-      FD_SET (src->client_sock_fd, &testfds);
-
-      /* no action (0) is an error too in our case */
-      if ((ret =
-              select (src->client_sock_fd + 1, &testfds, (fd_set *) 0,
-                  (fd_set *) 0, 0)) <= 0)
-        goto select_error;
-
-      /* ask how much is available for reading on the socket */
-      if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0)
-        goto ioctl_error;
-
-      buf = gst_buffer_new_and_alloc (readsize);
+      ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1,
+          outbuf);
       break;
-    }
+
     case GST_TCP_PROTOCOL_GDP:
-      /* if we haven't received caps yet, we should get them first */
       if (!src->caps_received) {
+        GstCaps *caps;
         gchar *string;
 
-        if (!(caps =
-                gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd)))
+        ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd, -1,
+            &caps);
+
+        if (ret != GST_FLOW_OK)
           goto gdp_caps_read_error;
 
         src->caps_received = TRUE;
@@ -236,83 +220,46 @@ gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
         gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
       }
 
-      /* now receive the buffer header */
-      if (!(buf =
-              gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd)))
-        goto gdp_buffer_read_error;
+      ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, -1,
+          outbuf);
 
-      GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
-          buf);
+      if (ret == GST_FLOW_OK)
+        gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
 
-      /* use this new buffer to read data into */
-      readsize = GST_BUFFER_SIZE (buf);
       break;
+
     default:
-      g_warning ("Unhandled protocol type");
+      /* need to assert as buf == NULL */
+      g_assert ("Unhandled protocol type");
       break;
   }
 
-  GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
-  if ((ret =
-          gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
-              readsize)) < 0)
-    goto read_error;
-
-  /* if we read 0 bytes, and we're blocking, we hit eos */
-  if (ret == 0)
-    goto hit_eos;
-
-  readsize = ret;
-  GST_LOG_OBJECT (src, "Read %d bytes", readsize);
-  GST_BUFFER_SIZE (buf) = readsize;
-  GST_BUFFER_OFFSET (buf) = src->curoffset;
-  GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
-  src->curoffset += readsize;
-
-  *outbuf = buf;
+  if (ret == GST_FLOW_OK) {
+    GST_LOG_OBJECT (src,
+        "Returning buffer from _get of size %d, ts %"
+        GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
+        ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
+        GST_BUFFER_SIZE (*outbuf),
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
+        GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
+        GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
+  }
 
-  return GST_FLOW_OK;
+  return ret;
 
-  /* ERROR */
-select_error:
+wrong_state:
   {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("select failed: %s", g_strerror (errno)));
-    return GST_FLOW_ERROR;
-  }
-ioctl_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("ioctl failed: %s", g_strerror (errno)));
-    return GST_FLOW_ERROR;
+    GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
+    return GST_FLOW_WRONG_STATE;
   }
 gdp_caps_read_error:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
         ("Could not read caps through GDP"));
-    return GST_FLOW_ERROR;
-  }
-gdp_buffer_read_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("Could not read buffer header through GDP"));
-    return GST_FLOW_ERROR;
-  }
-read_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
-  }
-hit_eos:
-  {
-    GST_DEBUG ("blocking read returns 0, EOS");
-    gst_buffer_unref (buf);
-    return GST_FLOW_WRONG_STATE;
+    return ret;
   }
 }
 
-
 static void
 gst_tcpserversrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)