subparse: fix off by one offset calculation
[platform/upstream/gstreamer.git] / gst / tcp / gsttcpclientsrc.c
index 2de0bce..d3668b9 100644 (file)
@@ -1,6 +1,8 @@
 /* GStreamer
  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2011> Collabora Ltd.
+ *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -14,8 +16,8 @@
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
@@ -28,7 +30,7 @@
  * # server:
  * nc -l -p 3000
  * # client:
- * gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2
+ * gst-launch tcpclientsrc port=3000 ! fdsink fd=2
  * ]| everything you type in the server is shown on the client
  * </refsect2>
  */
 #endif
 
 #include <gst/gst-i18n-plugin.h>
-#include "gsttcp.h"
 #include "gsttcpclientsrc.h"
-#include <string.h>             /* memset */
-#include <unistd.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-
+#include "gsttcp.h"
 
 GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
 #define GST_CAT_DEFAULT tcpclientsrc_debug
@@ -71,7 +68,8 @@ G_DEFINE_TYPE (GstTCPClientSrc, gst_tcp_client_src, GST_TYPE_PUSH_SRC);
 
 static void gst_tcp_client_src_finalize (GObject * gobject);
 
-static GstCaps *gst_tcp_client_src_getcaps (GstBaseSrc * psrc);
+static GstCaps *gst_tcp_client_src_getcaps (GstBaseSrc * psrc,
+    GstCaps * filter);
 
 static GstFlowReturn gst_tcp_client_src_create (GstPushSrc * psrc,
     GstBuffer ** outbuf);
@@ -114,7 +112,7 @@ gst_tcp_client_src_class_init (GstTCPClientSrcClass * klass)
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&srctemplate));
 
-  gst_element_class_set_details_simple (gstelement_class,
+  gst_element_class_set_static_metadata (gstelement_class,
       "TCP client source", "Source/Network",
       "Receive data as a client over the network via TCP",
       "Thomas Vander Stichele <thomas at apestaart dot org>");
@@ -136,8 +134,8 @@ gst_tcp_client_src_init (GstTCPClientSrc * this)
 {
   this->port = TCP_DEFAULT_PORT;
   this->host = g_strdup (TCP_DEFAULT_HOST);
-  this->sock_fd.fd = -1;
-  this->caps = NULL;
+  this->socket = NULL;
+  this->cancellable = g_cancellable_new ();
 
   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
 }
@@ -147,25 +145,28 @@ gst_tcp_client_src_finalize (GObject * gobject)
 {
   GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject);
 
+  if (this->cancellable)
+    g_object_unref (this->cancellable);
+  this->cancellable = NULL;
+  if (this->socket)
+    g_object_unref (this->socket);
+  this->socket = NULL;
   g_free (this->host);
+  this->host = NULL;
 
   G_OBJECT_CLASS (parent_class)->finalize (gobject);
 }
 
 static GstCaps *
-gst_tcp_client_src_getcaps (GstBaseSrc * bsrc)
+gst_tcp_client_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
 {
   GstTCPClientSrc *src;
   GstCaps *caps = NULL;
 
   src = GST_TCP_CLIENT_SRC (bsrc);
 
-  if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN))
-    caps = gst_caps_new_any ();
-  else if (src->caps)
-    caps = gst_caps_copy (src->caps);
-  else
-    caps = gst_caps_new_any ();
+  caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
+
   GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps);
   g_assert (GST_IS_CAPS (caps));
   return caps;
@@ -176,6 +177,10 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPClientSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
+  gssize rret;
+  GError *err = NULL;
+  GstMapInfo map;
+  gssize avail, read;
 
   src = GST_TCP_CLIENT_SRC (psrc);
 
@@ -185,28 +190,107 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   GST_LOG_OBJECT (src, "asked for a buffer");
 
   /* read the buffer header */
-  ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
-      src->fdset, outbuf);
+  avail = g_socket_get_available_bytes (src->socket);
+  if (avail < 0) {
+    goto get_available_error;
+  } else if (avail == 0) {
+    GIOCondition condition;
+
+    if (!g_socket_condition_wait (src->socket,
+            G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+      goto select_error;
+
+    condition =
+        g_socket_condition_check (src->socket,
+        G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+    if ((condition & G_IO_ERR)) {
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+          ("Socket in error state"));
+      *outbuf = NULL;
+      ret = GST_FLOW_ERROR;
+      goto done;
+    } else if ((condition & G_IO_HUP)) {
+      GST_DEBUG_OBJECT (src, "Connection closed");
+      *outbuf = NULL;
+      ret = GST_FLOW_EOS;
+      goto done;
+    }
+    avail = g_socket_get_available_bytes (src->socket);
+    if (avail < 0)
+      goto get_available_error;
+  }
+
+  if (avail > 0) {
+    read = MIN (avail, MAX_READ_SIZE);
+    *outbuf = gst_buffer_new_and_alloc (read);
+    gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
+    rret =
+        g_socket_receive (src->socket, (gchar *) map.data, read,
+        src->cancellable, &err);
+  } else {
+    /* Connection closed */
+    *outbuf = NULL;
+    read = 0;
+    rret = 0;
+  }
+
+  if (rret == 0) {
+    GST_DEBUG_OBJECT (src, "Connection closed");
+    ret = GST_FLOW_EOS;
+    if (*outbuf) {
+      gst_buffer_unmap (*outbuf, &map);
+      gst_buffer_unref (*outbuf);
+    }
+    *outbuf = NULL;
+  } else if (rret < 0) {
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      ret = GST_FLOW_FLUSHING;
+      GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
+    } else {
+      ret = GST_FLOW_ERROR;
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+          ("Failed to read from socket: %s", err->message));
+    }
+    gst_buffer_unmap (*outbuf, &map);
+    gst_buffer_unref (*outbuf);
+    *outbuf = NULL;
+  } else {
+    ret = GST_FLOW_OK;
+    gst_buffer_unmap (*outbuf, &map);
+    gst_buffer_resize (*outbuf, 0, rret);
 
-  if (ret == GST_FLOW_OK) {
     GST_LOG_OBJECT (src,
-        "Returning buffer from _get of size %d, ts %"
+        "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
         gst_buffer_get_size (*outbuf),
         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
-
-    gst_buffer_set_caps (*outbuf, src->caps);
   }
+  g_clear_error (&err);
 
+done:
   return ret;
 
+select_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Select failed: %s", err->message));
+    g_clear_error (&err);
+    return GST_FLOW_ERROR;
+  }
+get_available_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Failed to get available bytes from socket"));
+    return GST_FLOW_ERROR;
+  }
 wrong_state:
   {
     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
-    return GST_FLOW_WRONG_STATE;
+    return GST_FLOW_FLUSHING;
   }
 }
 
@@ -248,7 +332,6 @@ gst_tcp_client_src_get_property (GObject * object, guint prop_id,
     case PROP_PORT:
       g_value_set_int (value, tcpclientsrc->port);
       break;
-
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -259,79 +342,93 @@ gst_tcp_client_src_get_property (GObject * object, guint prop_id,
 static gboolean
 gst_tcp_client_src_start (GstBaseSrc * bsrc)
 {
-  int ret;
-  gchar *ip;
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
+  GError *err = NULL;
+  GInetAddress *addr;
+  GSocketAddress *saddr;
+  GResolver *resolver;
+
+  /* look up name if we need to */
+  addr = g_inet_address_new_from_string (src->host);
+  if (!addr) {
+    GList *results;
+
+    resolver = g_resolver_get_default ();
 
-  if ((src->fdset = gst_poll_new (TRUE)) == NULL)
-    goto socket_pair;
+    results =
+        g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
+    if (!results)
+      goto name_resolve;
+    addr = G_INET_ADDRESS (g_object_ref (results->data));
+
+    g_resolver_free_addresses (results);
+    g_object_unref (resolver);
+  }
+#ifndef GST_DISABLE_GST_DEBUG
+  {
+    gchar *ip = g_inet_address_to_string (addr);
+
+    GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
+    g_free (ip);
+  }
+#endif
+
+  saddr = g_inet_socket_address_new (addr, src->port);
+  g_object_unref (addr);
 
   /* create receiving client socket */
   GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
       src->host, src->port);
 
-  if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+  src->socket =
+      g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
+      G_SOCKET_PROTOCOL_TCP, &err);
+  if (!src->socket)
     goto no_socket;
 
-  GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
-      src->sock_fd.fd);
+  GST_DEBUG_OBJECT (src, "opened receiving client socket");
   GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
 
-  /* look up name if we need to */
-  if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
-    goto name_resolv;
-
-  GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
-
   /* connect to server */
-  memset (&src->server_sin, 0, sizeof (src->server_sin));
-  src->server_sin.sin_family = AF_INET; /* network socket */
-  src->server_sin.sin_port = htons (src->port); /* on port */
-  src->server_sin.sin_addr.s_addr = inet_addr (ip);     /* on host ip */
-  g_free (ip);
-
-  GST_DEBUG_OBJECT (src, "connecting to server");
-  ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
-      sizeof (src->server_sin));
-  if (ret)
+  if (!g_socket_connect (src->socket, saddr, src->cancellable, &err))
     goto connect_failed;
 
-  /* add the socket to the poll */
-  gst_poll_add_fd (src->fdset, &src->sock_fd);
-  gst_poll_fd_ctl_read (src->fdset, &src->sock_fd, TRUE);
+  g_object_unref (saddr);
 
   return TRUE;
 
-socket_pair:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        GST_ERROR_SYSTEM);
-    return FALSE;
-  }
 no_socket:
   {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+        ("Failed to create socket: %s", err->message));
+    g_clear_error (&err);
+    g_object_unref (saddr);
     return FALSE;
   }
-name_resolv:
+name_resolve:
   {
-    gst_tcp_client_src_stop (GST_BASE_SRC (src));
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled name resolval");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to resolve host '%s': %s", src->host, err->message));
+    }
+    g_clear_error (&err);
+    g_object_unref (resolver);
     return FALSE;
   }
 connect_failed:
   {
-    gst_tcp_client_src_stop (GST_BASE_SRC (src));
-    switch (errno) {
-      case ECONNREFUSED:
-        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
-            (_("Connection to %s:%d refused."), src->host, src->port), (NULL));
-        break;
-      default:
-        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
-            ("connect to %s:%d failed: %s", src->host, src->port,
-                g_strerror (errno)));
-        break;
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled connecting");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to connect to host '%s:%d': %s", src->host, src->port,
+              err->message));
     }
+    g_clear_error (&err);
+    g_object_unref (saddr);
+    gst_tcp_client_src_stop (GST_BASE_SRC (src));
     return FALSE;
   }
 }
@@ -343,22 +440,21 @@ static gboolean
 gst_tcp_client_src_stop (GstBaseSrc * bsrc)
 {
   GstTCPClientSrc *src;
+  GError *err = NULL;
 
   src = GST_TCP_CLIENT_SRC (bsrc);
 
-  GST_DEBUG_OBJECT (src, "closing socket");
+  if (src->socket) {
+    GST_DEBUG_OBJECT (src, "closing socket");
 
-  if (src->fdset != NULL) {
-    gst_poll_free (src->fdset);
-    src->fdset = NULL;
+    if (!g_socket_close (src->socket, &err)) {
+      GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
+      g_clear_error (&err);
+    }
+    g_object_unref (src->socket);
+    src->socket = NULL;
   }
 
-  gst_tcp_socket_close (&src->sock_fd);
-  src->caps_received = FALSE;
-  if (src->caps) {
-    gst_caps_unref (src->caps);
-    src->caps = NULL;
-  }
   GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
 
   return TRUE;
@@ -371,7 +467,7 @@ gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
 
   GST_DEBUG_OBJECT (src, "set to flushing");
-  gst_poll_set_flushing (src->fdset, TRUE);
+  g_cancellable_cancel (src->cancellable);
 
   return TRUE;
 }
@@ -383,7 +479,7 @@ gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc)
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
 
   GST_DEBUG_OBJECT (src, "unset flushing");
-  gst_poll_set_flushing (src->fdset, FALSE);
+  g_cancellable_reset (src->cancellable);
 
   return TRUE;
 }