(multi)udpsink: Use separate sockets for IPv4 and IPv6
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 25 Apr 2013 10:12:23 +0000 (12:12 +0200)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 25 Apr 2013 10:12:23 +0000 (12:12 +0200)
https://bugzilla.gnome.org/show_bug.cgi?id=534243

gst/udp/gstmultiudpsink.c
gst/udp/gstmultiudpsink.h

index dc51a10..c378ded 100644 (file)
@@ -99,8 +99,10 @@ enum
   PROP_BYTES_TO_SERVE,
   PROP_BYTES_SERVED,
   PROP_SOCKET,
+  PROP_SOCKET_V6,
   PROP_CLOSE_SOCKET,
   PROP_USED_SOCKET,
+  PROP_USED_SOCKET_V6,
   PROP_CLIENTS,
   PROP_AUTO_MULTICAST,
   PROP_MULTICAST_IFACE,
@@ -259,6 +261,10 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
       g_param_spec_object ("socket", "Socket Handle",
           "Socket to use for UDP sending. (NULL == allocate)",
           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
+      g_param_spec_object ("socket-v6", "Socket Handle IPv6",
+          "Socket to use for UDPv6 sending. (NULL == allocate)",
+          G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
       g_param_spec_boolean ("close-socket", "Close socket",
           "Close socket if passed as property on state change",
@@ -267,6 +273,10 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
       g_param_spec_object ("used-socket", "Used Socket Handle",
           "Socket currently in use for UDP sending. (NULL == no socket)",
           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
+      g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
+          "Socket currently in use for UDPv6 sending. (NULL == no socket)",
+          G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_CLIENTS,
       g_param_spec_string ("clients", "Clients",
           "A comma separated list of host:port pairs with destinations",
@@ -303,8 +313,8 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
    */
   g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
       g_param_spec_boolean ("force-ipv4", "Force IPv4",
-          "Forcing the use of an IPv4 socket", DEFAULT_FORCE_IPV4,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
+          DEFAULT_FORCE_IPV4, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
@@ -359,7 +369,9 @@ gst_multiudpsink_init (GstMultiUDPSink * sink)
 
   g_mutex_init (&sink->client_lock);
   sink->socket = DEFAULT_SOCKET;
+  sink->socket_v6 = DEFAULT_SOCKET;
   sink->used_socket = DEFAULT_USED_SOCKET;
+  sink->used_socket_v6 = DEFAULT_USED_SOCKET;
   sink->close_socket = DEFAULT_CLOSE_SOCKET;
   sink->external_socket = (sink->socket != NULL);
   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
@@ -461,10 +473,18 @@ gst_multiudpsink_finalize (GObject * object)
     g_object_unref (sink->socket);
   sink->socket = NULL;
 
+  if (sink->socket_v6)
+    g_object_unref (sink->socket_v6);
+  sink->socket_v6 = NULL;
+
   if (sink->used_socket)
     g_object_unref (sink->used_socket);
   sink->used_socket = NULL;
 
+  if (sink->used_socket_v6)
+    g_object_unref (sink->used_socket_v6);
+  sink->used_socket_v6 = NULL;
+
   if (sink->cancellable)
     g_object_unref (sink->cancellable);
   sink->cancellable = NULL;
@@ -527,6 +547,8 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
   num = 0;
   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
     GstUDPClient *client;
+    GSocket *socket;
+    GSocketFamily family;
     gint count;
 
     client = (GstUDPClient *) clients->data;
@@ -534,13 +556,20 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
     GST_LOG_OBJECT (sink, "sending %" G_GSIZE_FORMAT " bytes to client %p",
         size, client);
 
+    family = g_socket_address_get_family (G_SOCKET_ADDRESS (client->addr));
+    /* Select socket to send from for this address */
+    if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
+      socket = sink->used_socket_v6;
+    else
+      socket = sink->used_socket;
+
     count = sink->send_duplicates ? client->refcount : 1;
 
     while (count--) {
       gssize ret;
 
       ret =
-          g_socket_send_message (sink->used_socket, client->addr, vec, n_mem,
+          g_socket_send_message (socket, client->addr, vec, n_mem,
           NULL, 0, 0, sink->cancellable, &err);
 
       if (G_UNLIKELY (ret < 0)) {
@@ -655,13 +684,13 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
 }
 
 static void
-gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink)
+gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
 {
   /* don't touch on -1 */
   if (sink->qos_dscp < 0)
     return;
 
-  if (sink->used_socket == NULL)
+  if (socket == NULL)
     return;
 
 #ifdef IP_TOS
@@ -669,7 +698,7 @@ gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink)
     gint tos;
     gint fd;
 
-    fd = g_socket_get_fd (sink->used_socket);
+    fd = g_socket_get_fd (socket);
 
     GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);
 
@@ -713,6 +742,23 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id,
       udpsink->socket = g_value_dup_object (value);
       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
       break;
+    case PROP_SOCKET_V6:
+      if (udpsink->socket_v6 != NULL
+          && udpsink->socket_v6 != udpsink->used_socket_v6
+          && udpsink->close_socket) {
+        GError *err = NULL;
+
+        if (!g_socket_close (udpsink->socket_v6, &err)) {
+          GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
+              err->message);
+          g_clear_error (&err);
+        }
+      }
+      if (udpsink->socket_v6)
+        g_object_unref (udpsink->socket_v6);
+      udpsink->socket_v6 = g_value_dup_object (value);
+      GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
+      break;
     case PROP_CLOSE_SOCKET:
       udpsink->close_socket = g_value_get_boolean (value);
       break;
@@ -744,7 +790,8 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id,
       break;
     case PROP_QOS_DSCP:
       udpsink->qos_dscp = g_value_get_int (value);
-      gst_multiudpsink_setup_qos_dscp (udpsink);
+      gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
+      gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
       break;
     case PROP_SEND_DUPLICATES:
       udpsink->send_duplicates = g_value_get_boolean (value);
@@ -825,25 +872,36 @@ gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
 {
   GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
   GInetAddress *addr = g_inet_socket_address_get_address (saddr);
+  GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
+  GSocket *socket;
   GError *err = NULL;
 
   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
 
+  if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
+    goto invalid_family;
+
+  /* Select socket to send from for this address */
+  if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
+    socket = sink->used_socket_v6;
+  else
+    socket = sink->used_socket;
+
   if (g_inet_address_get_is_multicast (addr)) {
     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
     if (sink->auto_multicast) {
       GST_DEBUG_OBJECT (sink, "autojoining group");
-      if (!g_socket_join_multicast_group (sink->used_socket, addr, FALSE,
+      if (!g_socket_join_multicast_group (socket, addr, FALSE,
               sink->multi_iface, &err))
         goto join_group_failed;
     }
     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
-    g_socket_set_multicast_loopback (sink->used_socket, sink->loop);
+    g_socket_set_multicast_loopback (socket, sink->loop);
     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
-    g_socket_set_multicast_ttl (sink->used_socket, sink->ttl_mc);
+    g_socket_set_multicast_ttl (socket, sink->ttl_mc);
   } else {
     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
-    g_socket_set_ttl (sink->used_socket, sink->ttl);
+    g_socket_set_ttl (socket, sink->ttl);
   }
   return TRUE;
 
@@ -857,6 +915,13 @@ join_group_failed:
     g_clear_error (&err);
     return FALSE;
   }
+invalid_family:
+  {
+    gst_multiudpsink_stop (GST_BASE_SINK (sink));
+    GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
+        ("Invalid address family (got %d)", family));
+    return FALSE;
+  }
 }
 
 /* create a socket for sending to remote machine */
@@ -870,26 +935,47 @@ gst_multiudpsink_start (GstBaseSink * bsink)
 
   sink = GST_MULTIUDPSINK (bsink);
 
-  if (sink->socket == NULL) {
-    GST_DEBUG_OBJECT (sink, "creating sockets");
-    /* create sender socket try IP6, fall back to IP4 */
-    if (sink->force_ipv4 || (sink->used_socket =
-            g_socket_new (G_SOCKET_FAMILY_IPV6,
-                G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
-      if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
-                  G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
-        goto no_socket;
-    }
+  sink->external_socket = FALSE;
 
-    GST_DEBUG_OBJECT (sink, "have socket");
-    sink->external_socket = FALSE;
-  } else {
+  if (sink->socket) {
     GST_DEBUG_OBJECT (sink, "using configured socket");
-    /* we use the configured socket */
-    sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
+    if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) {
+      sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket));
+      sink->external_socket = TRUE;
+    } else {
+      sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
+      sink->external_socket = TRUE;
+    }
+  }
+
+  if (sink->socket_v6) {
+    GST_DEBUG_OBJECT (sink, "using configured IPv6 socket");
+    g_return_val_if_fail (g_socket_get_family (sink->socket) !=
+        G_SOCKET_FAMILY_IPV6, FALSE);
+
+    if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) {
+      GST_ERROR_OBJECT (sink,
+          "Provided different IPv6 sockets in socket and socket-v6 properties");
+      return FALSE;
+    }
+
+    sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6));
     sink->external_socket = TRUE;
   }
 
+  if (!sink->used_socket && !sink->used_socket_v6) {
+    /* create sender sockets if none available */
+
+    if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
+                G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
+      goto no_socket;
+
+    if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
+                G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
+      GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s", err->message);
+      g_clear_error (&err);
+    }
+  }
 #ifdef SO_SNDBUF
   {
     socklen_t len;
@@ -903,35 +989,65 @@ gst_multiudpsink_start (GstBaseSink * bsink)
       /* set buffer size, Note that on Linux this is typically limited to a
        * maximum of around 100K. Also a minimum of 128 bytes is required on
        * Linux. */
-      ret =
-          setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
-          SO_SNDBUF, (void *) &sndsize, len);
-      if (ret != 0) {
-        GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
-            ("Could not create a buffer of requested %d bytes, %d: %s",
-                sndsize, ret, g_strerror (errno)));
+
+      if (sink->used_socket) {
+        ret =
+            setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
+            SO_SNDBUF, (void *) &sndsize, len);
+        if (ret != 0) {
+          GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
+              ("Could not create a buffer of requested %d bytes, %d: %s",
+                  sndsize, ret, g_strerror (errno)));
+        }
+      }
+
+      if (sink->used_socket_v6) {
+        ret =
+            setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
+            SO_SNDBUF, (void *) &sndsize, len);
+        if (ret != 0) {
+          GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
+              ("Could not create a buffer of requested %d bytes, %d: %s",
+                  sndsize, ret, g_strerror (errno)));
+        }
       }
     }
 
     /* read the value of the receive buffer. Note that on linux this returns 2x the
      * value we set because the kernel allocates extra memory for metadata.
      * The default on Linux is about 100K (which is about 50K without metadata) */
-    ret =
-        getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, SO_SNDBUF,
-        (void *) &sndsize, &len);
-    if (ret == 0)
-      GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize);
-    else
-      GST_DEBUG_OBJECT (sink, "could not get udp buffer size");
+    if (sink->used_socket) {
+      ret =
+          getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
+          SO_SNDBUF, (void *) &sndsize, &len);
+      if (ret == 0)
+        GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize);
+      else
+        GST_DEBUG_OBJECT (sink, "could not get UDP buffer size");
+    }
+
+    if (sink->used_socket_v6) {
+      ret =
+          getsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
+          SO_SNDBUF, (void *) &sndsize, &len);
+      if (ret == 0)
+        GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize);
+      else
+        GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size");
+    }
   }
 #endif
 
-  g_socket_set_broadcast (sink->used_socket, TRUE);
+  if (sink->used_socket)
+    g_socket_set_broadcast (sink->used_socket, TRUE);
+  if (sink->used_socket_v6)
+    g_socket_set_broadcast (sink->used_socket_v6, TRUE);
 
   sink->bytes_to_serve = 0;
   sink->bytes_served = 0;
 
-  gst_multiudpsink_setup_qos_dscp (sink);
+  gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket);
+  gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6);
 
   /* look for multicast clients and join multicast groups appropriately
      set also ttl and multicast loopback delivery appropriately  */
@@ -974,6 +1090,20 @@ gst_multiudpsink_stop (GstBaseSink * bsink)
     udpsink->used_socket = NULL;
   }
 
+  if (udpsink->used_socket_v6) {
+    if (udpsink->close_socket || !udpsink->external_socket) {
+      GError *err = NULL;
+
+      if (!g_socket_close (udpsink->used_socket_v6, &err)) {
+        GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
+        g_clear_error (&err);
+      }
+    }
+
+    g_object_unref (udpsink->used_socket_v6);
+    udpsink->used_socket_v6 = NULL;
+  }
+
   return TRUE;
 }
 
@@ -1069,17 +1199,26 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
   if (client->refcount == 0) {
     GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
     GInetAddress *addr = g_inet_socket_address_get_address (saddr);
+    GSocketFamily family =
+        g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
+    GSocket *socket;
+
+    /* Select socket to send from for this address */
+    if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
+      socket = sink->used_socket_v6;
+    else
+      socket = sink->used_socket;
 
     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
 
     g_get_current_time (&now);
     client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
 
-    if (sink->used_socket && sink->auto_multicast
+    if (socket && sink->auto_multicast
         && g_inet_address_get_is_multicast (addr)) {
       GError *err = NULL;
 
-      if (!g_socket_leave_multicast_group (sink->used_socket, addr, FALSE,
+      if (!g_socket_leave_multicast_group (socket, addr, FALSE,
               sink->multi_iface, &err)) {
         GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s",
             err->message);
index a7f6b0b..cb84571 100644 (file)
@@ -56,7 +56,7 @@ typedef struct {
 struct _GstMultiUDPSink {
   GstBaseSink parent;
 
-  GSocket       *used_socket;
+  GSocket       *used_socket, *used_socket_v6;
   GCancellable  *cancellable;
 
   GMutex         client_lock;
@@ -68,7 +68,7 @@ struct _GstMultiUDPSink {
   /* properties */
   guint64        bytes_to_serve;
   guint64        bytes_served;
-  GSocket       *socket;
+  GSocket       *socket, *socket_v6;
   gboolean       close_socket;
 
   gboolean       external_socket;