dynudpsink: Port to GIO
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Tue, 17 Jan 2012 08:32:27 +0000 (09:32 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Tue, 17 Jan 2012 08:32:27 +0000 (09:32 +0100)
gst/udp/gstdynudpsink.c
gst/udp/gstdynudpsink.h

index a9e1cf9..abcd6f4 100644 (file)
@@ -2,6 +2,8 @@
  * Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
  * Copyright (C) <2006> Joni Valtanen <joni.valtanen@movial.fi>
+ * Copyright (C) <2012> Collabora Ltd.
+ *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 #include "gstudp-marshal.h"
 #include "gstdynudpsink.h"
 
-#include <stdio.h>
-#include <stdlib.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#include <errno.h>
-#include <string.h>
-#ifdef HAVE_SYS_TIME_H
-#include <sys/time.h>
-#endif
-#include <sys/types.h>
 #include <gst/net/gstnetaddressmeta.h>
 
 GST_DEBUG_CATEGORY_STATIC (dynudpsink_debug);
@@ -58,33 +49,26 @@ enum
   LAST_SIGNAL
 };
 
-#define UDP_DEFAULT_SOCKFD             -1
-#define UDP_DEFAULT_CLOSEFD            TRUE
+#define UDP_DEFAULT_SOCKET             NULL
+#define UDP_DEFAULT_CLOSE_SOCKET       TRUE
+#define UDP_DEFAULT_FAMILY              G_SOCKET_FAMILY_IPV4
 
 enum
 {
   PROP_0,
-  PROP_SOCKFD,
-  PROP_CLOSEFD
+  PROP_SOCKET,
+  PROP_CLOSE_SOCKET,
+  PROP_FAMILY
 };
 
-#define CLOSE_IF_REQUESTED(udpctx)                                        \
-G_STMT_START {                                                            \
-  if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
-    CLOSE_SOCKET(udpctx->sock);                                           \
-    if (udpctx->sock == udpctx->sockfd)                                   \
-      udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
-  }                                                                       \
-  udpctx->sock = -1;                                                      \
-} G_STMT_END
-
 static void gst_dynudpsink_finalize (GObject * object);
 
 static GstFlowReturn gst_dynudpsink_render (GstBaseSink * sink,
     GstBuffer * buffer);
-static void gst_dynudpsink_close (GstDynUDPSink * sink);
-static GstStateChangeReturn gst_dynudpsink_change_state (GstElement * element,
-    GstStateChange transition);
+static gboolean gst_dynudpsink_stop (GstBaseSink * bsink);
+static gboolean gst_dynudpsink_start (GstBaseSink * bsink);
+static gboolean gst_dynudpsink_unlock (GstBaseSink * bsink);
+static gboolean gst_dynudpsink_unlock_stop (GstBaseSink * bsink);
 
 static void gst_dynudpsink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -119,17 +103,21 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
       NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
       G_TYPE_STRING, G_TYPE_INT);
 
-  g_object_class_install_property (gobject_class, PROP_SOCKFD,
-      g_param_spec_int ("sockfd", "socket handle",
-          "Socket to use for UDP sending. (-1 == allocate)",
-          -1, G_MAXINT16, UDP_DEFAULT_SOCKFD,
+  g_object_class_install_property (gobject_class, PROP_SOCKET,
+      g_param_spec_object ("socket", "Socket",
+          "Socket to use for UDP sending. (NULL == allocate)",
+          G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
+      g_param_spec_boolean ("close-socket", "Close socket",
+          "Close socket if passed as property on state change",
+          UDP_DEFAULT_CLOSE_SOCKET,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_CLOSEFD,
-      g_param_spec_boolean ("closefd", "Close sockfd",
-          "Close sockfd if passed as property on state change",
-          UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  gstelement_class->change_state = gst_dynudpsink_change_state;
+  g_object_class_install_property (gobject_class, PROP_FAMILY,
+      g_param_spec_enum ("family", "Socket family",
+          "Use IPv4 or IPv6",
+          G_TYPE_SOCKET_FAMILY, UDP_DEFAULT_FAMILY,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sink_template));
@@ -140,6 +128,10 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
       "Philippe Khalaf <burger@speedy.org>");
 
   gstbasesink_class->render = gst_dynudpsink_render;
+  gstbasesink_class->start = gst_dynudpsink_start;
+  gstbasesink_class->stop = gst_dynudpsink_stop;
+  gstbasesink_class->unlock = gst_dynudpsink_unlock;
+  gstbasesink_class->unlock_stop = gst_dynudpsink_unlock_stop;
 
   GST_DEBUG_CATEGORY_INIT (dynudpsink_debug, "dynudpsink", 0, "UDP sink");
 }
@@ -147,44 +139,47 @@ gst_dynudpsink_class_init (GstDynUDPSinkClass * klass)
 static void
 gst_dynudpsink_init (GstDynUDPSink * sink)
 {
-  WSA_STARTUP (sink);
-
-  sink->sockfd = UDP_DEFAULT_SOCKFD;
-  sink->closefd = UDP_DEFAULT_CLOSEFD;
-  sink->externalfd = FALSE;
+  sink->socket = UDP_DEFAULT_SOCKET;
+  sink->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
+  sink->external_socket = FALSE;
+  sink->family = G_SOCKET_FAMILY_IPV4;
 
-  sink->sock = -1;
+  sink->used_socket = NULL;
+  sink->cancellable = g_cancellable_new ();
 }
 
 static void
 gst_dynudpsink_finalize (GObject * object)
 {
-  GstDynUDPSink *udpsink;
+  GstDynUDPSink *sink;
 
-  udpsink = GST_DYNUDPSINK (object);
+  sink = GST_DYNUDPSINK (object);
 
-  if (udpsink->sockfd >= 0 && udpsink->closefd)
-    CLOSE_SOCKET (udpsink->sockfd);
+  if (sink->cancellable)
+    g_object_unref (sink->cancellable);
+  sink->cancellable = NULL;
 
-  G_OBJECT_CLASS (parent_class)->finalize (object);
+  if (sink->socket)
+    g_object_unref (sink->socket);
+  sink->socket = NULL;
 
-  WSA_CLEANUP (object);
+  if (sink->used_socket)
+    g_object_unref (sink->used_socket);
+  sink->used_socket = NULL;
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 static GstFlowReturn
 gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
 {
   GstDynUDPSink *sink;
-  gint ret;
+  gssize ret;
   gsize size;
   guint8 *data;
   GstNetAddressMeta *meta;
-  struct sockaddr_in theiraddr;
-  guint16 destport;
-  guint32 destaddr;
-  const guint8 *destaddr_bytes;
-
-  memset (&theiraddr, 0, sizeof (theiraddr));
+  GSocketAddress *addr;
+  GError *err = NULL;
 
   meta = gst_buffer_get_net_address_meta (buffer);
 
@@ -195,46 +190,51 @@ gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
 
   sink = GST_DYNUDPSINK (bsink);
 
+  /* let's get the address from the metadata */
+  addr = meta->addr;
+
+  if (g_socket_address_get_family (addr) != sink->family)
+    goto invalid_family;
+
   data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
 
   GST_DEBUG ("about to send %" G_GSIZE_FORMAT " bytes", size);
 
-  /* let's get the address from the metaata */
-  destaddr_bytes =
-      g_inet_address_to_bytes (g_inet_socket_address_get_address
-      (G_INET_SOCKET_ADDRESS (meta->addr)));
-  destaddr = GST_READ_UINT32_BE (destaddr_bytes);
-  destport =
-      g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (meta->addr));
-
-  GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %d port %d", size,
-      destaddr, destport);
-
-  theiraddr.sin_family = AF_INET;
-  theiraddr.sin_addr.s_addr = destaddr;
-  theiraddr.sin_port = destport;
-#ifdef G_OS_WIN32
-  ret = sendto (sink->sock, (char *) data, size, 0,
-#else
-  ret = sendto (sink->sock, data, size, 0,
+#ifndef GST_DISABLE_GST_DEBUG
+  {
+    gchar *host;
+
+    host =
+        g_inet_address_to_string (g_inet_socket_address_get_address
+        (G_INET_SOCKET_ADDRESS (addr)));
+    GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %s port %d", size,
+        host, g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)));
+    g_free (host);
+  }
 #endif
-      (struct sockaddr *) &theiraddr, sizeof (theiraddr));
 
+  ret =
+      g_socket_send_to (sink->used_socket, addr, (gchar *) data, size,
+      sink->cancellable, &err);
   gst_buffer_unmap (buffer, data, size);
 
-  if (ret < 0) {
-    if (errno != EINTR && errno != EAGAIN) {
-      goto send_error;
-    }
-  }
+  if (ret < 0)
+    goto send_error;
 
-  GST_DEBUG ("sent %" G_GSIZE_FORMAT " bytes", size);
+  GST_DEBUG ("sent %" G_GSSIZE_FORMAT " bytes", ret);
 
   return GST_FLOW_OK;
 
 send_error:
   {
-    GST_DEBUG ("got send error %s (%d)", g_strerror (errno), errno);
+    GST_DEBUG ("got send error %s", err->message);
+    g_clear_error (&err);
+    return GST_FLOW_ERROR;
+  }
+invalid_family:
+  {
+    GST_DEBUG ("invalid family (got %d, expected %d)",
+        g_socket_address_get_family (addr), sink->family);
     return GST_FLOW_ERROR;
   }
 }
@@ -248,17 +248,28 @@ gst_dynudpsink_set_property (GObject * object, guint prop_id,
   udpsink = GST_DYNUDPSINK (object);
 
   switch (prop_id) {
-    case PROP_SOCKFD:
-      if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock &&
-          udpsink->closefd)
-        CLOSE_SOCKET (udpsink->sockfd);
-      udpsink->sockfd = g_value_get_int (value);
-      GST_DEBUG ("setting SOCKFD to %d", udpsink->sockfd);
+    case PROP_SOCKET:
+      if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
+          udpsink->close_socket) {
+        GError *err = NULL;
+
+        if (!g_socket_close (udpsink->socket, &err)) {
+          GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
+              err->message);
+          g_clear_error (&err);
+        }
+      }
+      if (udpsink->socket)
+        g_object_unref (udpsink->socket);
+      udpsink->socket = g_value_dup_object (value);
+      GST_DEBUG ("setting socket to %p", udpsink->socket);
       break;
-    case PROP_CLOSEFD:
-      udpsink->closefd = g_value_get_boolean (value);
+    case PROP_CLOSE_SOCKET:
+      udpsink->close_socket = g_value_get_boolean (value);
+      break;
+    case PROP_FAMILY:
+      udpsink->family = g_value_get_enum (value);
       break;
-
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -274,11 +285,14 @@ gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value,
   udpsink = GST_DYNUDPSINK (object);
 
   switch (prop_id) {
-    case PROP_SOCKFD:
-      g_value_set_int (value, udpsink->sockfd);
+    case PROP_SOCKET:
+      g_value_set_object (value, udpsink->socket);
       break;
-    case PROP_CLOSEFD:
-      g_value_set_boolean (value, udpsink->closefd);
+    case PROP_CLOSE_SOCKET:
+      g_value_set_boolean (value, udpsink->close_socket);
+      break;
+    case PROP_FAMILY:
+      g_value_set_enum (value, udpsink->family);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -286,87 +300,90 @@ gst_dynudpsink_get_property (GObject * object, guint prop_id, GValue * value,
   }
 }
 
-
 /* create a socket for sending to remote machine */
 static gboolean
-gst_dynudpsink_init_send (GstDynUDPSink * sink)
+gst_dynudpsink_start (GstBaseSink * bsink)
 {
-  guint bc_val;
+  GstDynUDPSink *udpsink;
+  GError *err = NULL;
+
+  udpsink = GST_DYNUDPSINK (bsink);
 
-  if (sink->sockfd == -1) {
+  if (udpsink->socket == NULL) {
     /* create sender socket if none available */
-    if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
+    if ((udpsink->used_socket =
+            g_socket_new (udpsink->family,
+                G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
       goto no_socket;
 
-    bc_val = 1;
-    if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
-            sizeof (bc_val)) < 0)
-      goto no_broadcast;
-
-    sink->externalfd = TRUE;
+    g_socket_set_broadcast (udpsink->used_socket, TRUE);
+    udpsink->external_socket = FALSE;
   } else {
-    sink->sock = sink->sockfd;
-    sink->externalfd = TRUE;
+    udpsink->used_socket = G_SOCKET (g_object_ref (udpsink->socket));
+    udpsink->external_socket = TRUE;
   }
+
   return TRUE;
 
   /* ERRORS */
 no_socket:
   {
-    perror ("socket");
-    return FALSE;
-  }
-no_broadcast:
-  {
-    perror ("setsockopt");
-    CLOSE_IF_REQUESTED (sink);
+    GST_ERROR_OBJECT (udpsink, "Failed to create socket: %s", err->message);
+    g_clear_error (&err);
     return FALSE;
   }
 }
 
-GValueArray *
+GstStructure *
 gst_dynudpsink_get_stats (GstDynUDPSink * sink, const gchar * host, gint port)
 {
   return NULL;
 }
 
-static void
-gst_dynudpsink_close (GstDynUDPSink * sink)
+static gboolean
+gst_dynudpsink_stop (GstBaseSink * bsink)
 {
-  CLOSE_IF_REQUESTED (sink);
+  GstDynUDPSink *udpsink;
+
+  udpsink = GST_DYNUDPSINK (bsink);
+
+  if (udpsink->used_socket) {
+    if (udpsink->close_socket || !udpsink->external_socket) {
+      GError *err = NULL;
+
+      if (!g_socket_close (udpsink->used_socket, &err)) {
+        GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
+        g_clear_error (&err);
+      }
+    }
+
+    g_object_unref (udpsink->used_socket);
+    udpsink->used_socket = NULL;
+  }
+
+  return TRUE;
 }
 
-static GstStateChangeReturn
-gst_dynudpsink_change_state (GstElement * element, GstStateChange transition)
+static gboolean
+gst_dynudpsink_unlock (GstBaseSink * bsink)
 {
-  GstStateChangeReturn ret;
-  GstDynUDPSink *sink;
+  GstDynUDPSink *udpsink;
 
-  sink = GST_DYNUDPSINK (element);
+  udpsink = GST_DYNUDPSINK (bsink);
 
-  switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-      if (!gst_dynudpsink_init_send (sink))
-        goto no_init;
-      break;
-    default:
-      break;
-  }
+  g_cancellable_cancel (udpsink->cancellable);
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  return TRUE;
+}
 
-  switch (transition) {
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      gst_dynudpsink_close (sink);
-      break;
-    default:
-      break;
-  }
-  return ret;
+static gboolean
+gst_dynudpsink_unlock_stop (GstBaseSink * bsink)
+{
+  GstDynUDPSink *udpsink;
 
-  /* ERRORS */
-no_init:
-  {
-    return GST_STATE_CHANGE_FAILURE;
-  }
+  udpsink = GST_DYNUDPSINK (bsink);
+
+  g_cancellable_reset (udpsink->cancellable);
+
+  return TRUE;
 }
index d46dfdf..a59d875 100644 (file)
 
 #include <gst/gst.h>
 #include <gst/base/gstbasesink.h>
+#include <gio/gio.h>
 
 G_BEGIN_DECLS
 
 #include "gstudpnetutils.h"
-
 #include "gstudp.h"
 
 #define GST_TYPE_DYNUDPSINK             (gst_dynudpsink_get_type())
@@ -45,26 +45,28 @@ struct _GstDynUDPSink {
   GstBaseSink parent;
 
   /* properties */
-  gint sockfd;
-  gboolean closefd;
+  GSocket *socket;
+  gboolean close_socket;
+  GSocketFamily family;
 
   /* the socket in use */
-  int sock;
-  gboolean externalfd;
+  GSocket *used_socket;
+  gboolean external_socket;
+  GCancellable *cancellable;
 };
 
 struct _GstDynUDPSinkClass {
   GstBaseSinkClass parent_class;
 
   /* element methods */
-  GValueArray*  (*get_stats)    (GstDynUDPSink *sink, const gchar *host, gint port);
+  GstStructure*  (*get_stats)    (GstDynUDPSink *sink, const gchar *host, gint port);
 
   /* signals */
 };
 
 GType gst_dynudpsink_get_type(void);
 
-GValueArray*    gst_dynudpsink_get_stats        (GstDynUDPSink *sink, const gchar *host, gint port);
+GstStructure*    gst_dynudpsink_get_stats        (GstDynUDPSink *sink, const gchar *host, gint port);
 
 G_END_DECLS