tcpclientsrc: Port to GIO
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Wed, 11 Jan 2012 14:09:46 +0000 (15:09 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 16 Jan 2012 10:13:31 +0000 (11:13 +0100)
gst/tcp/Makefile.am
gst/tcp/gsttcpclientsrc.c
gst/tcp/gsttcpclientsrc.h

index 6676ef4..d51f6ec 100644 (file)
@@ -23,9 +23,9 @@ libgsttcp_la_SOURCES = \
 nodist_libgsttcp_la_SOURCES = \
        $(built_sources)
 
-libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS)
+libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(GST_CFLAGS) $(GIO_CFLAGS)
 libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
-libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS)
+libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS)
 libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static
 
 noinst_HEADERS = \
index fe34a9e..d761876 100644 (file)
@@ -1,6 +1,8 @@
 /* GStreamer
  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2011> Collabora Ltd.
+ *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 #endif
 
 #include <gst/gst-i18n-plugin.h>
-#include "gsttcp.h"
 #include "gsttcpclientsrc.h"
-#include <string.h>             /* memset */
-#include <unistd.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-
+#include "gsttcp.h"
 
 GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
 #define GST_CAT_DEFAULT tcpclientsrc_debug
@@ -137,8 +134,8 @@ gst_tcp_client_src_init (GstTCPClientSrc * this)
 {
   this->port = TCP_DEFAULT_PORT;
   this->host = g_strdup (TCP_DEFAULT_HOST);
-  this->sock_fd.fd = -1;
-  this->caps = NULL;
+  this->socket = NULL;
+  this->cancellable = g_cancellable_new ();
 
   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
 }
@@ -148,7 +145,14 @@ gst_tcp_client_src_finalize (GObject * gobject)
 {
   GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject);
 
+  if (this->cancellable)
+    g_object_unref (this->cancellable);
+  this->cancellable = NULL;
+  if (this->socket)
+    g_object_unref (this->socket);
+  this->socket = NULL;
   g_free (this->host);
+  this->host = NULL;
 
   G_OBJECT_CLASS (parent_class)->finalize (gobject);
 }
@@ -161,15 +165,7 @@ gst_tcp_client_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
 
   src = GST_TCP_CLIENT_SRC (bsrc);
 
-  if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN))
-    caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
-  else if (src->caps && filter)
-    caps =
-        gst_caps_intersect_full (filter, src->caps, GST_CAPS_INTERSECT_FIRST);
-  else if (src->caps)
-    caps = gst_caps_copy (src->caps);
-  else
-    caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
+  caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
 
   GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps);
   g_assert (GST_IS_CAPS (caps));
@@ -181,6 +177,9 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPClientSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
+  gssize rret;
+  GError *err = NULL;
+  guint8 *data;
 
   src = GST_TCP_CLIENT_SRC (psrc);
 
@@ -190,10 +189,34 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   GST_LOG_OBJECT (src, "asked for a buffer");
 
   /* read the buffer header */
-  ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
-      src->fdset, outbuf);
+  *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
+  data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
+  rret =
+      g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE,
+      src->cancellable, &err);
+
+  if (rret == 0) {
+    GST_DEBUG_OBJECT (src, "Connection closed");
+    ret = GST_FLOW_EOS;
+    gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+    gst_buffer_unref (*outbuf);
+    *outbuf = NULL;
+  } else if (ret < 0) {
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      ret = GST_FLOW_WRONG_STATE;
+      GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
+    } else {
+      ret = GST_FLOW_ERROR;
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+          ("Failed to read from socket: %s", err->message));
+    }
+    gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+    gst_buffer_unref (*outbuf);
+    *outbuf = NULL;
+  } else {
+    ret = GST_FLOW_OK;
+    gst_buffer_unmap (*outbuf, data, rret);
 
-  if (ret == GST_FLOW_OK) {
     GST_LOG_OBJECT (src,
         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
@@ -203,6 +226,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
   }
+  g_clear_error (&err);
 
   return ret;
 
@@ -251,7 +275,6 @@ gst_tcp_client_src_get_property (GObject * object, guint prop_id,
     case PROP_PORT:
       g_value_set_int (value, tcpclientsrc->port);
       break;
-
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -262,79 +285,90 @@ gst_tcp_client_src_get_property (GObject * object, guint prop_id,
 static gboolean
 gst_tcp_client_src_start (GstBaseSrc * bsrc)
 {
-  int ret;
-  gchar *ip;
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
-
-  if ((src->fdset = gst_poll_new (TRUE)) == NULL)
-    goto socket_pair;
+  GError *err = NULL;
+  GInetAddress *addr;
+  GSocketAddress *saddr;
 
   /* create receiving client socket */
   GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
       src->host, src->port);
 
-  if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+  src->socket =
+      g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
+      G_SOCKET_PROTOCOL_TCP, &err);
+  if (!src->socket)
     goto no_socket;
 
-  GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
-      src->sock_fd.fd);
+  GST_DEBUG_OBJECT (src, "opened receiving client socket");
   GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
 
   /* look up name if we need to */
-  if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
-    goto name_resolv;
+  addr = g_inet_address_new_from_string (src->host);
+  if (!addr) {
+    GResolver *resolver = g_resolver_get_default ();
+    GList *results;
+
+    results =
+        g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
+    if (!results)
+      goto name_resolve;
+    addr = G_INET_ADDRESS (g_object_ref (results->data));
+
+    g_resolver_free_addresses (results);
+    g_object_unref (resolver);
+  }
+#ifndef GST_DISABLE_GST_DEBUG
+  {
+    gchar *ip = g_inet_address_to_string (addr);
 
-  GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
+    GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
+    g_free (ip);
+  }
+#endif
 
   /* connect to server */
-  memset (&src->server_sin, 0, sizeof (src->server_sin));
-  src->server_sin.sin_family = AF_INET; /* network socket */
-  src->server_sin.sin_port = htons (src->port); /* on port */
-  src->server_sin.sin_addr.s_addr = inet_addr (ip);     /* on host ip */
-  g_free (ip);
-
-  GST_DEBUG_OBJECT (src, "connecting to server");
-  ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
-      sizeof (src->server_sin));
-  if (ret)
+  saddr = g_inet_socket_address_new (addr, src->port);
+  if (!g_socket_connect (src->socket, saddr, src->cancellable, &err))
     goto connect_failed;
 
-  /* add the socket to the poll */
-  gst_poll_add_fd (src->fdset, &src->sock_fd);
-  gst_poll_fd_ctl_read (src->fdset, &src->sock_fd, TRUE);
+  g_object_unref (saddr);
+  g_object_unref (addr);
 
   return TRUE;
 
-socket_pair:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        GST_ERROR_SYSTEM);
-    return FALSE;
-  }
 no_socket:
   {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+        ("Failed to create socket: %s", err->message));
+    g_clear_error (&err);
     return FALSE;
   }
-name_resolv:
+name_resolve:
   {
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled name resolval");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to resolve host '%s': %s", src->host, err->message));
+    }
+    g_clear_error (&err);
     gst_tcp_client_src_stop (GST_BASE_SRC (src));
     return FALSE;
   }
 connect_failed:
   {
-    gst_tcp_client_src_stop (GST_BASE_SRC (src));
-    switch (errno) {
-      case ECONNREFUSED:
-        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
-            (_("Connection to %s:%d refused."), src->host, src->port), (NULL));
-        break;
-      default:
-        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
-            ("connect to %s:%d failed: %s", src->host, src->port,
-                g_strerror (errno)));
-        break;
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled connecting");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to connect to host '%s:%d': %s", src->host, src->port,
+              err->message));
     }
+    g_clear_error (&err);
+    g_object_unref (saddr);
+    g_object_unref (addr);
+    gst_tcp_client_src_stop (GST_BASE_SRC (src));
     return FALSE;
   }
 }
@@ -346,22 +380,21 @@ static gboolean
 gst_tcp_client_src_stop (GstBaseSrc * bsrc)
 {
   GstTCPClientSrc *src;
+  GError *err = NULL;
 
   src = GST_TCP_CLIENT_SRC (bsrc);
 
-  GST_DEBUG_OBJECT (src, "closing socket");
+  if (src->socket) {
+    GST_DEBUG_OBJECT (src, "closing socket");
 
-  if (src->fdset != NULL) {
-    gst_poll_free (src->fdset);
-    src->fdset = NULL;
+    if (!g_socket_close (src->socket, &err)) {
+      GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
+      g_clear_error (&err);
+    }
+    g_object_unref (src->socket);
+    src->socket = NULL;
   }
 
-  gst_tcp_socket_close (&src->sock_fd);
-  src->caps_received = FALSE;
-  if (src->caps) {
-    gst_caps_unref (src->caps);
-    src->caps = NULL;
-  }
   GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
 
   return TRUE;
@@ -374,7 +407,7 @@ gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
 
   GST_DEBUG_OBJECT (src, "set to flushing");
-  gst_poll_set_flushing (src->fdset, TRUE);
+  g_cancellable_cancel (src->cancellable);
 
   return TRUE;
 }
@@ -386,7 +419,7 @@ gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc)
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
 
   GST_DEBUG_OBJECT (src, "unset flushing");
-  gst_poll_set_flushing (src->fdset, FALSE);
+  g_cancellable_reset (src->cancellable);
 
   return TRUE;
 }
index 77d5702..6f17506 100644 (file)
 #include <gst/gst.h>
 #include <gst/base/gstpushsrc.h>
 
-G_BEGIN_DECLS
-
-#include <netdb.h>                        /* sockaddr_in */
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>                   /* sockaddr_in */
-#include <unistd.h>
+#include <gio/gio.h>
 
-#include "gsttcp.h"
+G_BEGIN_DECLS
 
 #define GST_TYPE_TCP_CLIENT_SRC \
   (gst_tcp_client_src_get_type())
@@ -50,9 +44,9 @@ typedef struct _GstTCPClientSrc GstTCPClientSrc;
 typedef struct _GstTCPClientSrcClass GstTCPClientSrcClass;
 
 typedef enum {
-  GST_TCP_CLIENT_SRC_OPEN       = (GST_ELEMENT_FLAG_LAST << 0),
+  GST_TCP_CLIENT_SRC_OPEN       = (GST_BASE_SRC_FLAG_LAST << 0),
 
-  GST_TCP_CLIENT_SRC_FLAG_LAST  = (GST_ELEMENT_FLAG_LAST << 2)
+  GST_TCP_CLIENT_SRC_FLAG_LAST  = (GST_BASE_SRC_FLAG_LAST << 2)
 } GstTCPClientSrcFlags;
 
 struct _GstTCPClientSrc {
@@ -61,14 +55,10 @@ struct _GstTCPClientSrc {
   /* server information */
   int port;
   gchar *host;
-  struct sockaddr_in server_sin;
 
   /* socket */
-  GstPollFD sock_fd;
-  GstPoll *fdset;
-
-  gboolean caps_received;      /* if we have received caps yet */
-  GstCaps *caps;
+  GSocket *socket;
+  GCancellable *cancellable;
 };
 
 struct _GstTCPClientSrcClass {