udpsrc: Port to GIO
[platform/upstream/gstreamer.git] / gst / udp / gstudpsrc.c
index d9d45e1..ce913d7 100644 (file)
@@ -1,6 +1,8 @@
 /* GStreamer
  * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
+ * Copyright (C) <2012> Collabora Ltd.
+ *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 #endif
 
 #include "gstudpsrc.h"
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#include <stdlib.h>
-
-#if defined _MSC_VER && (_MSC_VER >= 1400)
-#include <io.h>
-#endif
 
 #include <gst/net/gstnetaddressmeta.h>
-
-#ifdef HAVE_FIONREAD_IN_SYS_FILIO
-#include <sys/filio.h>
-#endif
+#include <sys/socket.h>
 
 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
 #define GST_CAT_DEFAULT (udpsrc_debug)
@@ -135,12 +126,12 @@ static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
 #define UDP_DEFAULT_MULTICAST_IFACE     NULL
 #define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
 #define UDP_DEFAULT_CAPS                NULL
-#define UDP_DEFAULT_SOCKFD              -1
+#define UDP_DEFAULT_SOCKET              NULL
 #define UDP_DEFAULT_BUFFER_SIZE                0
 #define UDP_DEFAULT_TIMEOUT             0
 #define UDP_DEFAULT_SKIP_FIRST_BYTES   0
-#define UDP_DEFAULT_CLOSEFD            TRUE
-#define UDP_DEFAULT_SOCK                -1
+#define UDP_DEFAULT_CLOSE_SOCKET       TRUE
+#define UDP_DEFAULT_USED_SOCKET        NULL
 #define UDP_DEFAULT_AUTO_MULTICAST     TRUE
 #define UDP_DEFAULT_REUSE              TRUE
 
@@ -153,28 +144,18 @@ enum
   PROP_MULTICAST_IFACE,
   PROP_URI,
   PROP_CAPS,
-  PROP_SOCKFD,
+  PROP_SOCKET,
   PROP_BUFFER_SIZE,
   PROP_TIMEOUT,
   PROP_SKIP_FIRST_BYTES,
-  PROP_CLOSEFD,
-  PROP_SOCK,
+  PROP_CLOSE_SOCKET,
+  PROP_USED_SOCKET,
   PROP_AUTO_MULTICAST,
   PROP_REUSE,
 
   PROP_LAST
 };
 
-#define CLOSE_IF_REQUESTED(udpctx)                                        \
-G_STMT_START {                                                            \
-  if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
-    CLOSE_SOCKET(udpctx->sock.fd);                                        \
-    if (udpctx->sock.fd == udpctx->sockfd)                                \
-      udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
-  }                                                                       \
-  udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
-} G_STMT_END
-
 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
 
 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
@@ -240,18 +221,17 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
       g_param_spec_boxed ("caps", "Caps",
           "The caps of the source pad", GST_TYPE_CAPS,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_SOCKFD,
-      g_param_spec_int ("sockfd", "Socket Handle",
-          "Socket to use for UDP reception. (-1 == allocate)",
-          -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_SOCKET,
+      g_param_spec_object ("socket", "Socket",
+          "Socket to use for UDP reception. (NULL == allocate)",
+          G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
       g_param_spec_int ("buffer-size", "Buffer Size",
           "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
           UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
       g_param_spec_uint64 ("timeout", "Timeout",
-          "Post a message after timeout microseconds (0 = disabled)", 0,
+          "Post a message after timeout nanoseconds (0 = disabled)", 0,
           G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (G_OBJECT_CLASS (klass),
@@ -259,15 +239,15 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
           "Skip first bytes", "number of bytes to skip for each udp packet", 0,
           G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_CLOSEFD,
-      g_param_spec_boolean ("closefd", "Close sockfd",
-          "Close sockfd if passed as property on state change",
-          UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  g_object_class_install_property (gobject_class, PROP_SOCK,
-      g_param_spec_int ("sock", "Socket Handle",
-          "Socket currently in use for UDP reception. (-1 = no socket)",
-          -1, G_MAXINT, UDP_DEFAULT_SOCK,
-          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
+      g_param_spec_boolean ("close-socket", "Close socket",
+          "Close socket if passed as property on state change",
+          UDP_DEFAULT_CLOSE_SOCKET,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
+      g_param_spec_object ("used-socket", "Socket Handle",
+          "Socket currently in use for UDP reception. (NULL = no socket)",
+          G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
       g_param_spec_boolean ("auto-multicast", "Auto Multicast",
           "Automatically join/leave multicast groups",
@@ -300,20 +280,25 @@ gst_udpsrc_init (GstUDPSrc * udpsrc)
 {
   WSA_STARTUP (udpsrc);
 
-  gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
+  udpsrc->uri =
+      g_strdup_printf ("udp://%s:%u", UDP_DEFAULT_MULTICAST_GROUP,
       UDP_DEFAULT_PORT);
 
-  udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
+  udpsrc->host = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
+  udpsrc->port = UDP_DEFAULT_PORT;
+  udpsrc->socket = UDP_DEFAULT_SOCKET;
   udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
   udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
   udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
-  udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
-  udpsrc->externalfd = (udpsrc->sockfd != -1);
+  udpsrc->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
+  udpsrc->external_socket = (udpsrc->socket != NULL);
   udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
-  udpsrc->sock.fd = UDP_DEFAULT_SOCK;
+  udpsrc->used_socket = UDP_DEFAULT_USED_SOCKET;
   udpsrc->reuse = UDP_DEFAULT_REUSE;
 
+  udpsrc->cancellable = g_cancellable_new ();
+
   /* configure basesrc to be a live source */
   gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
   /* make basesrc output a segment in time */
@@ -332,16 +317,25 @@ gst_udpsrc_finalize (GObject * object)
 
   if (udpsrc->caps)
     gst_caps_unref (udpsrc->caps);
+  udpsrc->caps = NULL;
 
   g_free (udpsrc->multi_iface);
+  udpsrc->multi_iface = NULL;
 
-  gst_udp_uri_free (&udpsrc->uri);
-  g_free (udpsrc->uristr);
+  g_free (udpsrc->uri);
+  udpsrc->uri = NULL;
 
-  if (udpsrc->sockfd >= 0 && udpsrc->closefd)
-    CLOSE_SOCKET (udpsrc->sockfd);
+  if (udpsrc->socket)
+    g_object_unref (udpsrc->socket);
+  udpsrc->socket = NULL;
 
-  WSA_CLEANUP (object);
+  if (udpsrc->used_socket)
+    g_object_unref (udpsrc->used_socket);
+  udpsrc->used_socket = NULL;
+
+  if (udpsrc->cancellable)
+    g_object_unref (udpsrc->cancellable);
+  udpsrc->cancellable = NULL;
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -353,112 +347,57 @@ gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
 
   udpsrc = GST_UDPSRC (src);
 
-  if (udpsrc->caps)
-    return gst_caps_ref (udpsrc->caps);
-  else
-    return gst_caps_new_any ();
-}
-
-/* read a message from the error queue */
-static void
-clear_error (GstUDPSrc * udpsrc)
-{
-#if defined (MSG_ERRQUEUE)
-  struct msghdr cmsg;
-  char cbuf[128];
-  char msgbuf[CMSG_SPACE (128)];
-  struct iovec iov;
-
-  /* Flush ERRORS from fd so next poll will not return at once */
-  /* No need for address : We look for local error */
-  cmsg.msg_name = NULL;
-  cmsg.msg_namelen = 0;
-
-  /* IOV */
-  memset (&cbuf, 0, sizeof (cbuf));
-  iov.iov_base = cbuf;
-  iov.iov_len = sizeof (cbuf);
-  cmsg.msg_iov = &iov;
-  cmsg.msg_iovlen = 1;
-
-  /* msg_control */
-  memset (&msgbuf, 0, sizeof (msgbuf));
-  cmsg.msg_control = &msgbuf;
-  cmsg.msg_controllen = sizeof (msgbuf);
-
-  recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
-#endif
+  if (udpsrc->caps) {
+    return (filter) ? gst_caps_intersect_full (filter, udpsrc->caps,
+        GST_CAPS_INTERSECT_FIRST) : gst_caps_ref (udpsrc->caps);
+  } else {
+    return (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
+  }
 }
 
 static GstFlowReturn
 gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 {
   GstUDPSrc *udpsrc;
-  GstNetAddressMeta *meta;
   GstBuffer *outbuf;
-  union gst_sockaddr
-  {
-    struct sockaddr sa;
-    struct sockaddr_in sa_in;
-    struct sockaddr_in6 sa_in6;
-    struct sockaddr_storage sa_stor;
-  } sa;
-  socklen_t slen;
+  GSocketAddress *saddr = NULL;
   guint8 *pktdata;
   gint pktsize;
   gsize offset;
-#ifdef G_OS_UNIX
-  gint readsize;
-#elif defined G_OS_WIN32
-  gulong readsize;
-#endif
-  GstClockTime timeout;
-  gint ret;
+  gssize readsize;
+  gssize ret;
   gboolean try_again;
+  GError *err = NULL;
 
   udpsrc = GST_UDPSRC_CAST (psrc);
 
 retry:
   /* quick check, avoid going in select when we already have data */
-  readsize = 0;
-  if (G_UNLIKELY ((ret =
-              IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
-    goto ioctl_failed;
-
+  readsize = g_socket_get_available_bytes (udpsrc->used_socket);
   if (readsize > 0)
     goto no_select;
 
-  if (udpsrc->timeout > 0) {
-    timeout = udpsrc->timeout * GST_USECOND;
-  } else {
-    timeout = GST_CLOCK_TIME_NONE;
-  }
-
   do {
     try_again = FALSE;
 
     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
         udpsrc->timeout);
 
-    ret = gst_poll_wait (udpsrc->fdset, timeout);
-    GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
-    if (G_UNLIKELY (ret < 0)) {
-      if (errno == EBUSY)
+    if (!g_socket_condition_wait (udpsrc->used_socket, G_IO_IN | G_IO_PRI,
+            udpsrc->cancellable, &err)) {
+      if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)
+          || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
         goto stopped;
-#ifdef G_OS_WIN32
-      if (WSAGetLastError () != WSAEINTR)
-        goto select_error;
-#else
-      if (errno != EAGAIN && errno != EINTR)
+      } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+        /* timeout, post element message */
+        gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
+            gst_message_new_element (GST_OBJECT_CAST (udpsrc),
+                gst_structure_new ("GstUDPSrcTimeout",
+                    "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
+      } else {
         goto select_error;
-#endif
-      try_again = TRUE;
-    } else if (G_UNLIKELY (ret == 0)) {
-      /* timeout, post element message */
-      gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
-          gst_message_new_element (GST_OBJECT_CAST (udpsrc),
-              gst_structure_new ("GstUDPSrcTimeout",
-                  "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
+      }
+
       try_again = TRUE;
     }
   } while (G_UNLIKELY (try_again));
@@ -466,10 +405,9 @@ retry:
   /* ask how much is available for reading on the socket, this should be exactly
    * one UDP packet. We will check the return value, though, because in some
    * case it can return 0 and we don't want a 0 sized buffer. */
-  readsize = 0;
-  if (G_UNLIKELY ((ret =
-              IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
-    goto ioctl_failed;
+  readsize = g_socket_get_available_bytes (udpsrc->used_socket);
+  if (G_UNLIKELY (readsize < 0))
+    goto get_available_error;
 
   /* If we get here and the readsize is zero, then either select was woken up
    * by activity that is not a read, or a poll error occurred, or a UDP packet
@@ -479,11 +417,14 @@ retry:
   if (G_UNLIKELY (!readsize)) {
     /* try to read a packet (and it will be ignored),
      * in case a packet with no data arrived */
-    slen = sizeof (sa);
-    recvfrom (udpsrc->sock.fd, (char *) &slen, 0, 0, &sa.sa, &slen);
 
-    /* clear any error, in case a poll error occurred */
-    clear_error (udpsrc);
+    pktdata = NULL;
+    pktsize = 0;
+    ret =
+        g_socket_receive_from (udpsrc->used_socket, NULL, (gchar *) pktdata,
+        pktsize, udpsrc->cancellable, &err);
+    if (G_UNLIKELY (ret < 0))
+      goto receive_error;
 
     /* poll again */
     goto retry;
@@ -496,33 +437,16 @@ no_select:
   pktsize = readsize;
   offset = 0;
 
-  while (TRUE) {
-    slen = sizeof (sa);
-#ifdef G_OS_WIN32
-    ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
-        &slen);
-#else
-    ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
-#endif
-    if (G_UNLIKELY (ret < 0)) {
-#ifdef G_OS_WIN32
-      /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
-       * generated a "port unreachable" ICMP response. We ignore that and try
-       * again. */
-      if (WSAGetLastError () == WSAECONNRESET) {
-        g_free (pktdata);
-        pktdata = NULL;
-        goto retry;
-      }
-      if (WSAGetLastError () != WSAEINTR)
-        goto receive_error;
-#else
-      if (errno != EAGAIN && errno != EINTR)
-        goto receive_error;
-#endif
-    } else
-      break;
-  }
+  if (saddr)
+    g_object_unref (saddr);
+  saddr = NULL;
+
+  ret =
+      g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) pktdata,
+      pktsize, udpsrc->cancellable, &err);
+
+  if (G_UNLIKELY (ret < 0))
+    goto receive_error;
 
   /* patch pktdata and len when stripping off the headers */
   if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
@@ -538,31 +462,12 @@ no_select:
       gst_memory_new_wrapped (0, pktdata, g_free, pktsize, offset, ret));
 
   /* use buffer metadata so receivers can also track the address */
-  meta = gst_buffer_add_net_address_meta (outbuf);
-
-  switch (sa.sa.sa_family) {
-    case AF_INET:
-    {
-      gst_net_address_set_ip4_address (&meta->naddr, sa.sa_in.sin_addr.s_addr,
-          sa.sa_in.sin_port);
-    }
-      break;
-    case AF_INET6:
-    {
-      guint8 ip6[16];
-
-      memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
-      gst_net_address_set_ip6_address (&meta->naddr, ip6, sa.sa_in6.sin6_port);
-    }
-      break;
-    default:
-#ifdef G_OS_WIN32
-      WSASetLastError (WSAEAFNOSUPPORT);
-#else
-      errno = EAFNOSUPPORT;
-#endif
-      goto receive_error;
+  if (saddr) {
+    gst_buffer_add_net_address_meta (outbuf, saddr);
+    g_object_unref (saddr);
   }
+  saddr = NULL;
+
   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
 
   *buf = GST_BUFFER_CAST (outbuf);
@@ -573,7 +478,8 @@ no_select:
 select_error:
   {
     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
-        ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
+        ("select error: %s", err->message));
+    g_clear_error (&err);
     return GST_FLOW_ERROR;
   }
 stopped:
@@ -581,23 +487,26 @@ stopped:
     GST_DEBUG ("stop called");
     return GST_FLOW_WRONG_STATE;
   }
-ioctl_failed:
+get_available_error:
   {
     GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
-        ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
+        ("get available bytes failed"));
     return GST_FLOW_ERROR;
   }
 receive_error:
   {
     g_free (pktdata);
-#ifdef G_OS_WIN32
-    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
-        ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
-#else
-    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
-        ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
-#endif
-    return GST_FLOW_ERROR;
+
+    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
+        g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      g_clear_error (&err);
+      return GST_FLOW_WRONG_STATE;
+    } else {
+      GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+          ("receive error %d: %s", ret, err->message));
+      g_clear_error (&err);
+      return GST_FLOW_ERROR;
+    }
   }
 skip_error:
   {
@@ -610,11 +519,21 @@ skip_error:
 static gboolean
 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
 {
-  if (gst_udp_parse_uri (uri, &src->uri) < 0)
+  gchar *host;
+  guint16 port;
+
+  if (!gst_udp_parse_uri (uri, &host, &port))
     goto wrong_uri;
 
-  if (src->uri.port == -1)
-    src->uri.port = UDP_DEFAULT_PORT;
+  if (port == -1)
+    port = UDP_DEFAULT_PORT;
+
+  g_free (src->host);
+  src->host = host;
+  src->port = port;
+
+  g_free (src->uri);
+  src->uri = g_strdup (uri);
 
   return TRUE;
 
@@ -640,16 +559,22 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
       udpsrc->buffer_size = g_value_get_int (value);
       break;
     case PROP_PORT:
-      gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
+      udpsrc->port = g_value_get_int (value);
+      g_free (udpsrc->uri);
+      udpsrc->uri = g_strdup_printf ("udp://%s:%u", udpsrc->host, udpsrc->port);
       break;
     case PROP_MULTICAST_GROUP:
     {
       const gchar *group;
 
+      g_free (udpsrc->host);
       if ((group = g_value_get_string (value)))
-        gst_udp_uri_update (&udpsrc->uri, group, -1);
+        udpsrc->host = g_strdup (group);
       else
-        gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
+        udpsrc->host = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
+
+      g_free (udpsrc->uri);
+      udpsrc->uri = g_strdup_printf ("udp://%s:%u", udpsrc->host, udpsrc->port);
       break;
     }
     case PROP_MULTICAST_IFACE:
@@ -684,12 +609,21 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
       gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
       break;
     }
-    case PROP_SOCKFD:
-      if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
-          udpsrc->closefd)
-        CLOSE_SOCKET (udpsrc->sockfd);
-      udpsrc->sockfd = g_value_get_int (value);
-      GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
+    case PROP_SOCKET:
+      if (udpsrc->socket != NULL && udpsrc->socket != udpsrc->used_socket &&
+          udpsrc->close_socket) {
+        GError *err = NULL;
+
+        if (!g_socket_close (udpsrc->socket, &err)) {
+          GST_ERROR ("failed to close socket %p: %s", udpsrc->socket,
+              err->message);
+          g_clear_error (&err);
+        }
+      }
+      if (udpsrc->socket)
+        g_object_unref (udpsrc->socket);
+      udpsrc->socket = g_value_dup_object (value);
+      GST_DEBUG ("setting socket to %p", udpsrc->socket);
       break;
     case PROP_TIMEOUT:
       udpsrc->timeout = g_value_get_uint64 (value);
@@ -697,8 +631,8 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
     case PROP_SKIP_FIRST_BYTES:
       udpsrc->skip_first_bytes = g_value_get_int (value);
       break;
-    case PROP_CLOSEFD:
-      udpsrc->closefd = g_value_get_boolean (value);
+    case PROP_CLOSE_SOCKET:
+      udpsrc->close_socket = g_value_get_boolean (value);
       break;
     case PROP_AUTO_MULTICAST:
       udpsrc->auto_multicast = g_value_get_boolean (value);
@@ -722,22 +656,22 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
       g_value_set_int (value, udpsrc->buffer_size);
       break;
     case PROP_PORT:
-      g_value_set_int (value, udpsrc->uri.port);
+      g_value_set_int (value, udpsrc->port);
       break;
     case PROP_MULTICAST_GROUP:
-      g_value_set_string (value, udpsrc->uri.host);
+      g_value_set_string (value, udpsrc->host);
       break;
     case PROP_MULTICAST_IFACE:
       g_value_set_string (value, udpsrc->multi_iface);
       break;
     case PROP_URI:
-      g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
+      g_value_take_string (value, udpsrc->uri);
       break;
     case PROP_CAPS:
       gst_value_set_caps (value, udpsrc->caps);
       break;
-    case PROP_SOCKFD:
-      g_value_set_int (value, udpsrc->sockfd);
+    case PROP_SOCKET:
+      g_value_set_object (value, udpsrc->socket);
       break;
     case PROP_TIMEOUT:
       g_value_set_uint64 (value, udpsrc->timeout);
@@ -745,11 +679,11 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SKIP_FIRST_BYTES:
       g_value_set_int (value, udpsrc->skip_first_bytes);
       break;
-    case PROP_CLOSEFD:
-      g_value_set_boolean (value, udpsrc->closefd);
+    case PROP_CLOSE_SOCKET:
+      g_value_set_boolean (value, udpsrc->close_socket);
       break;
-    case PROP_SOCK:
-      g_value_set_int (value, udpsrc->sock.fd);
+    case PROP_USED_SOCKET:
+      g_value_set_object (value, udpsrc->used_socket);
       break;
     case PROP_AUTO_MULTICAST:
       g_value_set_boolean (value, udpsrc->auto_multicast);
@@ -767,204 +701,201 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
 static gboolean
 gst_udpsrc_start (GstBaseSrc * bsrc)
 {
-  guint bc_val;
-  guint err_val;
-  gint reuse;
-  int port;
   GstUDPSrc *src;
-  gint ret;
-  int rcvsize;
-  struct sockaddr_storage bind_address;
-  socklen_t len;
+  GInetAddress *addr, *bind_addr;
+  GSocketAddress *bind_saddr;
+  GResolver *resolver;
+  GError *err = NULL;
+
   src = GST_UDPSRC (bsrc);
 
-  if (src->sockfd == -1) {
+  if (src->socket == NULL) {
     /* need to allocate a socket */
-    GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
-        src->uri.port);
-    if ((ret =
-            gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
-      goto getaddrinfo_error;
-
-    if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
-      goto no_socket;
+    GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->host, src->port);
+
+    addr = g_inet_address_new_from_string (src->host);
+    if (!addr) {
+      GList *results;
+
+      resolver = g_resolver_get_default ();
+      results =
+          g_resolver_lookup_by_name (resolver, src->host, src->cancellable,
+          &err);
+      if (!results)
+        goto name_resolve;
+      addr = G_INET_ADDRESS (g_object_ref (results->data));
+
+      g_resolver_free_addresses (results);
+      g_object_unref (resolver);
+    }
+#ifndef GST_DISABLE_GST_DEBUG
+    {
+      gchar *ip = g_inet_address_to_string (addr);
 
-    src->sock.fd = ret;
-    src->externalfd = FALSE;
-
-    GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
-
-    GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
-    reuse = src->reuse ? 1 : 0;
-    if ((ret =
-            setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
-                sizeof (reuse))) < 0)
-      goto setsockopt_error;
-
-    GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
-
-    /* Take a temporary copy of the address in case we need to fix it for bind */
-    memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
-
-#ifdef G_OS_WIN32
-    /* Windows does not allow binding to a multicast group so fix source address */
-    if (gst_udp_is_multicast (&src->myaddr)) {
-      switch (((struct sockaddr *) &bind_address)->sa_family) {
-        case AF_INET:
-          ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
-              htonl (INADDR_ANY);
-          break;
-        case AF_INET6:
-          ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
-          break;
-        default:
-          break;
-      }
+      GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
+      g_free (ip);
     }
 #endif
 
-    len = gst_udp_get_sockaddr_length (&bind_address);
-    if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
+    if ((src->used_socket =
+            g_socket_new (g_inet_address_get_family (addr),
+                G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
+      goto no_socket;
+
+    src->external_socket = FALSE;
+
+    GST_DEBUG_OBJECT (src, "got socket %p", src->used_socket);
+
+    if (src->addr)
+      g_object_unref (src->addr);
+    src->addr =
+        G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
+
+    GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
+
+    if (g_inet_address_get_is_multicast (addr))
+      bind_addr = g_inet_address_new_any (g_inet_address_get_family (addr));
+    else
+      bind_addr = G_INET_ADDRESS (g_object_ref (addr));
+
+    g_object_unref (addr);
+
+    bind_saddr = g_inet_socket_address_new (bind_addr, src->port);
+    g_object_unref (bind_addr);
+    if (!g_socket_bind (src->used_socket, bind_saddr, src->reuse, &err))
       goto bind_error;
 
-    if (!gst_udp_is_multicast (&src->myaddr)) {
-      len = sizeof (src->myaddr);
-      if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
-                  &len)) < 0)
-        goto getsockname_error;
-    }
+    g_object_unref (bind_saddr);
   } else {
-    GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
+    GST_DEBUG_OBJECT (src, "using provided socket %d", src->socket);
     /* we use the configured socket, try to get some info about it */
-    len = sizeof (src->myaddr);
-    if ((ret =
-            getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
-                &len)) < 0)
+    src->used_socket = G_SOCKET (g_object_ref (src->socket));
+    src->external_socket = TRUE;
+
+    if (src->addr)
+      g_object_unref (src->addr);
+    src->addr =
+        G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
+            &err));
+    if (!src->addr)
       goto getsockname_error;
-
-    src->sock.fd = src->sockfd;
-    src->externalfd = TRUE;
   }
 
-  len = sizeof (rcvsize);
-  if (src->buffer_size != 0) {
-    rcvsize = src->buffer_size;
+  if (src->timeout)
+    g_socket_set_timeout (src->used_socket, src->timeout / GST_SECOND);
 
-    GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
-    /* set buffer size, Note that on Linux this is typically limited to a
-     * maximum of around 100K. Also a minimum of 128 bytes is required on
-     * Linux. */
-    ret =
-        setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
-        len);
-    if (ret != 0) {
-      GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
-          ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
-              rcvsize, ret, g_strerror (errno), errno));
+#ifdef SO_RECVBUF
+  {
+    gint rcvsize, ret;
+
+    len = sizeof (rcvsize);
+    if (src->buffer_size != 0) {
+      rcvsize = src->buffer_size;
+
+      GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
+      /* set buffer size, Note that on Linux this is typically limited to a
+       * maximum of around 100K. Also a minimum of 128 bytes is required on
+       * Linux. */
+      ret =
+          setsockopt (g_socket_get_fd (src->used_socket), SOL_SOCKET, SO_RCVBUF,
+          (void *) &rcvsize, len);
+      if (ret != 0) {
+        GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+            ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
+                rcvsize, ret, g_strerror (errno), errno));
+      }
     }
-  }
 
-  /* read the value of the receive buffer. Note that on linux this returns 2x the
-   * value we set because the kernel allocates extra memory for metadata.
-   * The default on Linux is about 100K (which is about 50K without metadata) */
-  ret =
-      getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
-  if (ret == 0)
-    GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
-  else
-    GST_DEBUG_OBJECT (src, "could not get udp buffer size");
-
-  bc_val = 1;
-  if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
-              sizeof (bc_val))) < 0) {
-    GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
-        ("could not configure socket for broadcast %d: %s (%d)", ret,
-            g_strerror (errno), errno));
-  }
-
-  /* Accept ERRQUEUE to get and flush icmp errors */
-  err_val = 1;
-#if defined (IP_RECVERR)
-  if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
-              sizeof (err_val))) < 0) {
-    GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
-        ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
-            g_strerror (errno), errno));
+    /* read the value of the receive buffer. Note that on linux this returns 2x the
+     * value we set because the kernel allocates extra memory for metadata.
+     * The default on Linux is about 100K (which is about 50K without metadata) */
+    ret =
+        getsockopt (g_socket_get_fd (src->used_socket), SOL_SOCKET, SO_RCVBUF,
+        (void *) &rcvsize, &len);
+    if (ret == 0)
+      GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
+    else
+      GST_DEBUG_OBJECT (src, "could not get udp buffer size");
   }
 #endif
 
-  if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
-    GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
-    ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
-    if (ret < 0)
+  g_socket_set_broadcast (src->used_socket, TRUE);
+
+  if (src->auto_multicast
+      &&
+      g_inet_address_get_is_multicast (g_inet_socket_address_get_address
+          (src->addr))) {
+    GST_DEBUG_OBJECT (src, "joining multicast group %s", src->host);
+    if (!g_socket_join_multicast_group (src->used_socket,
+            g_inet_socket_address_get_address (src->addr),
+            FALSE, src->multi_iface, &err))
       goto membership;
   }
 
   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
    * follows ss_family on both */
-  port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
-  GST_DEBUG_OBJECT (src, "bound, on port %d", port);
-  if (port != src->uri.port) {
-    src->uri.port = port;
-    GST_DEBUG_OBJECT (src, "notifying port %d", port);
-    g_object_notify (G_OBJECT (src), "port");
-  }
+  {
+    GInetSocketAddress *addr;
+    guint16 port;
 
-  if ((src->fdset = gst_poll_new (TRUE)) == NULL)
-    goto no_fdset;
+    addr =
+        G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
+            &err));
+    if (!addr)
+      goto getsockname_error;
 
-  gst_poll_add_fd (src->fdset, &src->sock);
-  gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
+    port = g_inet_socket_address_get_port (addr);
+    GST_DEBUG_OBJECT (src, "bound, on port %d", port);
+    if (port != src->port) {
+      src->port = port;
+      GST_DEBUG_OBJECT (src, "notifying port %d", port);
+      g_object_notify (G_OBJECT (src), "port");
+    }
+    g_object_unref (addr);
+  }
 
   return TRUE;
 
   /* ERRORS */
-getaddrinfo_error:
+name_resolve:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
+        ("Name resolval failed: %s", err->message));
+    g_clear_error (&err);
+    g_object_unref (resolver);
     return FALSE;
   }
 no_socket:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
-        ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
-    return FALSE;
-  }
-setsockopt_error:
-  {
-    CLOSE_IF_REQUESTED (src);
-    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
+        ("no socket error: %s", err->message));
+    g_clear_error (&err);
+    g_object_unref (addr);
     return FALSE;
   }
 bind_error:
   {
-    CLOSE_IF_REQUESTED (src);
+    gst_udpsrc_stop (GST_BASE_SRC (src));
     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
+        ("bind failed: %s", err->message));
+    g_clear_error (&err);
+    g_object_unref (bind_saddr);
     return FALSE;
   }
 membership:
   {
-    CLOSE_IF_REQUESTED (src);
+    gst_udpsrc_stop (GST_BASE_SRC (src));
     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
+        ("could add membership: %s", err->message));
+    g_clear_error (&err);
     return FALSE;
   }
 getsockname_error:
   {
-    CLOSE_IF_REQUESTED (src);
+    gst_udpsrc_stop (GST_BASE_SRC (src));
     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
-    return FALSE;
-  }
-no_fdset:
-  {
-    CLOSE_IF_REQUESTED (src);
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
-            errno));
+        ("getsockname failed: %s", err->message));
+    g_clear_error (&err);
     return FALSE;
   }
 }
@@ -977,7 +908,7 @@ gst_udpsrc_unlock (GstBaseSrc * bsrc)
   src = GST_UDPSRC (bsrc);
 
   GST_LOG_OBJECT (src, "Flushing");
-  gst_poll_set_flushing (src->fdset, TRUE);
+  g_cancellable_cancel (src->cancellable);
 
   return TRUE;
 }
@@ -990,7 +921,7 @@ gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
   src = GST_UDPSRC (bsrc);
 
   GST_LOG_OBJECT (src, "No longer flushing");
-  gst_poll_set_flushing (src->fdset, FALSE);
+  g_cancellable_reset (src->cancellable);
 
   return TRUE;
 }
@@ -1004,17 +935,36 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
 
   GST_DEBUG ("stopping, closing sockets");
 
-  if (src->sock.fd >= 0) {
-    if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
-      GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
-      gst_udp_leave_group (src->sock.fd, &src->myaddr);
+  if (src->used_socket) {
+    if (src->auto_multicast
+        &&
+        g_inet_address_get_is_multicast (g_inet_socket_address_get_address
+            (src->addr))) {
+      GError *err = NULL;
+
+      GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->host);
+
+      if (!g_socket_leave_multicast_group (src->used_socket,
+              g_inet_socket_address_get_address (src->addr), FALSE,
+              src->multi_iface, NULL)) {
+        GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
+            err->message);
+        g_clear_error (&err);
+      }
+    }
+
+    if (src->close_socket || !src->external_socket) {
+      GError *err = NULL;
+      if (!g_socket_close (src->used_socket, &err)) {
+        GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
+        g_clear_error (&err);
+      }
     }
-    CLOSE_IF_REQUESTED (src);
-  }
 
-  if (src->fdset) {
-    gst_poll_free (src->fdset);
-    src->fdset = NULL;
+    g_object_unref (src->used_socket);
+    src->used_socket = NULL;
+    g_object_unref (src->addr);
+    src->addr = NULL;
   }
 
   return TRUE;
@@ -1041,11 +991,7 @@ gst_udpsrc_uri_get_uri (GstURIHandler * handler)
 {
   GstUDPSrc *src = GST_UDPSRC (handler);
 
-  /* FIXME: make thread-safe; maybe we can get rid of this assignment here? */
-  g_free (src->uristr);
-  src->uristr = gst_udp_uri_string (&src->uri);
-
-  return g_strdup (src->uristr);
+  return g_strdup (src->uri);
 }
 
 static gboolean