tcpserversrc: Port to GIO
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Wed, 11 Jan 2012 15:06:22 +0000 (16:06 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 16 Jan 2012 10:13:37 +0000 (11:13 +0100)
gst/tcp/gsttcpserversrc.c
gst/tcp/gsttcpserversrc.h

index 3c4b5b1..42a89ed 100644 (file)
@@ -1,6 +1,8 @@
 /* GStreamer
  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) <2011> Collabora Ltd.
+ *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 #include <gst/gst-i18n-plugin.h>
 #include "gsttcp.h"
 #include "gsttcpserversrc.h"
-#include <string.h>             /* memset */
-#include <unistd.h>
-#include <sys/ioctl.h>
-#include <fcntl.h>
-
 
 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
 #define GST_CAT_DEFAULT tcpserversrc_debug
@@ -52,13 +49,13 @@ GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
 #define TCP_DEFAULT_LISTEN_HOST         NULL    /* listen on all interfaces */
 #define TCP_BACKLOG                     1       /* client connection queue */
 
+#define MAX_READ_SIZE                   4 * 1024
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
     GST_STATIC_CAPS_ANY);
 
-
 enum
 {
   PROP_0,
@@ -69,12 +66,12 @@ enum
 #define gst_tcp_server_src_parent_class parent_class
 G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
 
-
 static void gst_tcp_server_src_finalize (GObject * gobject);
 
 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
+static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
     GstBuffer ** buf);
 
@@ -119,6 +116,7 @@ gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
   gstbasesrc_class->start = gst_tcp_server_src_start;
   gstbasesrc_class->stop = gst_tcp_server_src_stop;
   gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
+  gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
 
   gstpush_src_class->create = gst_tcp_server_src_create;
 
@@ -131,8 +129,9 @@ gst_tcp_server_src_init (GstTCPServerSrc * src)
 {
   src->server_port = TCP_DEFAULT_PORT;
   src->host = g_strdup (TCP_DEFAULT_HOST);
-  src->server_sock_fd.fd = -1;
-  src->client_sock_fd.fd = -1;
+  src->server_socket = NULL;
+  src->client_socket = NULL;
+  src->cancellable = g_cancellable_new ();
 
   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
 }
@@ -142,7 +141,18 @@ gst_tcp_server_src_finalize (GObject * gobject)
 {
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
 
+  if (src->cancellable)
+    g_object_unref (src->cancellable);
+  src->cancellable = NULL;
+  if (src->server_socket)
+    g_object_unref (src->server_socket);
+  src->server_socket = NULL;
+  if (src->client_socket)
+    g_object_unref (src->client_socket);
+  src->client_socket = NULL;
+
   g_free (src->host);
+  src->host = NULL;
 
   G_OBJECT_CLASS (parent_class)->finalize (gobject);
 }
@@ -152,51 +162,56 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPServerSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
+  gssize rret;
+  GError *err = NULL;
+  guint8 *data;
 
   src = GST_TCP_SERVER_SRC (psrc);
 
   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
     goto wrong_state;
 
-restart:
-  if (src->client_sock_fd.fd >= 0) {
-    /* if we have a client, wait for read */
-    gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE);
-    gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE);
-  } else {
-    /* else wait on server socket for connections */
-    gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE);
-  }
-
-  /* no action (0) is an error too in our case */
-  if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) {
-    if (ret == -1 && errno == EBUSY)
-      goto select_cancelled;
-    else
-      goto select_error;
-  }
-
-  /* if we have no client socket we can accept one now */
-  if (src->client_sock_fd.fd < 0) {
-    if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) {
-      if ((src->client_sock_fd.fd =
-              accept (src->server_sock_fd.fd,
-                  (struct sockaddr *) &src->client_sin,
-                  &src->client_sin_len)) == -1)
-        goto accept_error;
-
-      gst_poll_add_fd (src->fdset, &src->client_sock_fd);
-    }
-    /* and restart now to poll the socket. */
-    goto restart;
+  if (!src->client_socket) {
+    /* wait on server socket for connections */
+    src->client_socket =
+        g_socket_accept (src->server_socket, src->cancellable, &err);
+    if (!src->client_socket)
+      goto accept_error;
+    /* now read from the socket. */
   }
 
+  /* if we have a client, wait for read */
   GST_LOG_OBJECT (src, "asked for a buffer");
 
-  ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
-      src->fdset, outbuf);
+  /* read the buffer header */
+  *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
+  data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
+  rret =
+      g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE,
+      src->cancellable, &err);
+
+  if (rret == 0) {
+    GST_DEBUG_OBJECT (src, "Connection closed");
+    ret = GST_FLOW_EOS;
+    gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+    gst_buffer_unref (*outbuf);
+    *outbuf = NULL;
+  } else if (ret < 0) {
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      ret = GST_FLOW_WRONG_STATE;
+      GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
+    } else {
+      ret = GST_FLOW_ERROR;
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+          ("Failed to read from socket: %s", err->message));
+    }
+    gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+    gst_buffer_unref (*outbuf);
+    *outbuf = NULL;
+  } else {
+    ret = GST_FLOW_OK;
+    gst_buffer_unmap (*outbuf, data, rret);
 
-  if (ret == GST_FLOW_OK) {
     GST_LOG_OBJECT (src,
         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
@@ -206,6 +221,7 @@ restart:
         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
   }
+  g_clear_error (&err);
 
   return ret;
 
@@ -214,21 +230,15 @@ wrong_state:
     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
     return GST_FLOW_WRONG_STATE;
   }
-select_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("Select error: %s", g_strerror (errno)));
-    return GST_FLOW_ERROR;
-  }
-select_cancelled:
-  {
-    GST_DEBUG_OBJECT (src, "select canceled");
-    return GST_FLOW_WRONG_STATE;
-  }
 accept_error:
   {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
-        ("Could not accept client on server socket: %s", g_strerror (errno)));
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to accept client: %s", err->message));
+    }
+    g_clear_error (&err);
     return GST_FLOW_ERROR;
   }
 }
@@ -271,7 +281,6 @@ gst_tcp_server_src_get_property (GObject * object, guint prop_id,
     case PROP_PORT:
       g_value_set_int (value, tcpserversrc->server_port);
       break;
-
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -282,104 +291,108 @@ gst_tcp_server_src_get_property (GObject * object, guint prop_id,
 static gboolean
 gst_tcp_server_src_start (GstBaseSrc * bsrc)
 {
-  int ret;
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
-
-  /* reset caps_received flag */
-  src->caps_received = FALSE;
+  GError *err = NULL;
+  GInetAddress *addr;
+  GSocketAddress *saddr;
 
   /* create the server listener socket */
-  if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
-    goto socket_error;
-
-  GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
-      src->server_sock_fd.fd);
-
-  /* make address reusable */
-  ret = 1;
-  if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
-          sizeof (int)) < 0)
-    goto sock_opt;
-
-  /* name the socket */
-  memset (&src->server_sin, 0, sizeof (src->server_sin));
-  src->server_sin.sin_family = AF_INET; /* network socket */
-  src->server_sin.sin_port = htons (src->server_port);  /* on port */
-  if (src->host) {
-    gchar *host;
-
-    if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
-      goto host_error;
-    src->server_sin.sin_addr.s_addr = inet_addr (host);
-    g_free (host);
-  } else
-    src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
+  src->server_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
+      G_SOCKET_PROTOCOL_TCP, &err);
+  if (!src->server_socket)
+    goto no_socket;
+
+  GST_DEBUG_OBJECT (src, "opened receiving server socket");
+
+  /* look up name if we need to */
+  addr = g_inet_address_new_from_string (src->host);
+  if (!addr) {
+    GResolver *resolver = g_resolver_get_default ();
+    GList *results;
+
+    results =
+        g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
+    if (!results)
+      goto name_resolve;
+    addr = G_INET_ADDRESS (g_object_ref (results->data));
+
+    g_resolver_free_addresses (results);
+    g_object_unref (resolver);
+  }
+#ifndef GST_DISABLE_GST_DEBUG
+  {
+    gchar *ip = g_inet_address_to_string (addr);
+
+    GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
+    g_free (ip);
+  }
+#endif
 
   /* bind it */
+  saddr = g_inet_socket_address_new (addr, src->server_port);
   GST_DEBUG_OBJECT (src, "binding server socket to address");
-  if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin,
-              sizeof (src->server_sin))) < 0)
-    goto bind_error;
+  if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
+    goto bind_failed;
 
-  GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
-      src->server_sock_fd.fd, TCP_BACKLOG);
+  GST_DEBUG_OBJECT (src, "listening on server socket");
 
-  if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1)
-    goto listen_error;
+  g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
 
-  /* create an fdset to keep track of our file descriptors */
-  if ((src->fdset = gst_poll_new (TRUE)) == NULL)
-    goto socket_pair;
-
-  gst_poll_add_fd (src->fdset, &src->server_sock_fd);
-
-  GST_DEBUG_OBJECT (src, "received client");
+  if (!g_socket_listen (src->server_socket, &err))
+    goto listen_failed;
 
   GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
 
   return TRUE;
 
   /* ERRORS */
-socket_error:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
-    return FALSE;
-  }
-sock_opt:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("Could not setsockopt: %s", g_strerror (errno)));
-    gst_tcp_socket_close (&src->server_sock_fd);
-    return FALSE;
-  }
-host_error:
+no_socket:
   {
-    gst_tcp_socket_close (&src->server_sock_fd);
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+        ("Failed to create socket: %s", err->message));
+    g_clear_error (&err);
     return FALSE;
   }
-bind_error:
+name_resolve:
   {
-    gst_tcp_socket_close (&src->server_sock_fd);
-    switch (errno) {
-      default:
-        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
-            ("bind failed: %s", g_strerror (errno)));
-        break;
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled name resolval");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to resolve host '%s': %s", src->host, err->message));
     }
+    g_clear_error (&err);
+    gst_tcp_server_src_stop (GST_BASE_SRC (src));
     return FALSE;
   }
-listen_error:
+bind_failed:
   {
-    gst_tcp_socket_close (&src->server_sock_fd);
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
-        ("Could not listen on server socket: %s", g_strerror (errno)));
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled binding");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
+              err->message));
+    }
+    g_clear_error (&err);
+    g_object_unref (saddr);
+    g_object_unref (addr);
+    gst_tcp_server_src_stop (GST_BASE_SRC (src));
     return FALSE;
   }
-socket_pair:
+listen_failed:
   {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        GST_ERROR_SYSTEM);
-    gst_tcp_socket_close (&src->server_sock_fd);
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      GST_DEBUG_OBJECT (src, "Cancelled listening");
+    } else {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+          ("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
+              err->message));
+    }
+    g_clear_error (&err);
+    g_object_unref (saddr);
+    g_object_unref (addr);
+    gst_tcp_server_src_stop (GST_BASE_SRC (src));
     return FALSE;
   }
 }
@@ -388,12 +401,29 @@ static gboolean
 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
 {
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
+  GError *err = NULL;
 
-  gst_poll_free (src->fdset);
-  src->fdset = NULL;
+  if (src->client_socket) {
+    GST_DEBUG_OBJECT (src, "closing socket");
 
-  gst_tcp_socket_close (&src->server_sock_fd);
-  gst_tcp_socket_close (&src->client_sock_fd);
+    if (!g_socket_close (src->client_socket, &err)) {
+      GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
+      g_clear_error (&err);
+    }
+    g_object_unref (src->client_socket);
+    src->client_socket = NULL;
+  }
+
+  if (src->server_socket) {
+    GST_DEBUG_OBJECT (src, "closing socket");
+
+    if (!g_socket_close (src->server_socket, &err)) {
+      GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
+      g_clear_error (&err);
+    }
+    g_object_unref (src->server_socket);
+    src->server_socket = NULL;
+  }
 
   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
 
@@ -406,7 +436,17 @@ gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
 {
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
 
-  gst_poll_set_flushing (src->fdset, TRUE);
+  g_cancellable_cancel (src->cancellable);
+
+  return TRUE;
+}
+
+static gboolean
+gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
+{
+  GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
+
+  g_cancellable_reset (src->cancellable);
 
   return TRUE;
 }
index f5e80e2..326c4a5 100644 (file)
 
 #include <gst/gst.h>
 #include <gst/base/gstpushsrc.h>
+#include <gio/gio.h>
 
 G_END_DECLS
 
-#include <errno.h>
-#include <string.h>
-#include <sys/types.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
 #include "gsttcp.h"
 
-#include <fcntl.h>
-
 #define GST_TYPE_TCP_SERVER_SRC \
   (gst_tcp_server_src_get_type())
 #define GST_TCP_SERVER_SRC(obj) \
@@ -53,9 +45,9 @@ typedef struct _GstTCPServerSrc GstTCPServerSrc;
 typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass;
 
 typedef enum {
-  GST_TCP_SERVER_SRC_OPEN       = (GST_ELEMENT_FLAG_LAST << 0),
+  GST_TCP_SERVER_SRC_OPEN       = (GST_BASE_SRC_FLAG_LAST << 0),
 
-  GST_TCP_SERVER_SRC_FLAG_LAST  = (GST_ELEMENT_FLAG_LAST << 2)
+  GST_TCP_SERVER_SRC_FLAG_LAST  = (GST_BASE_SRC_FLAG_LAST << 2)
 } GstTCPServerSrcFlags;
 
 struct _GstTCPServerSrc {
@@ -64,17 +56,10 @@ struct _GstTCPServerSrc {
   /* server information */
   int server_port;
   gchar *host;
-  struct sockaddr_in server_sin;
-  GstPollFD server_sock_fd;
-
-  /* client information */
-  struct sockaddr_in client_sin;
-  socklen_t client_sin_len;
-  GstPollFD client_sock_fd;
-
-  GstPoll *fdset;
 
-  gboolean caps_received;      /* if we have received caps yet */
+  GCancellable *cancellable;
+  GSocket *server_socket;
+  GSocket *client_socket;
 };
 
 struct _GstTCPServerSrcClass {