From b1af93f791909c883f0da97b784d3e03e810bdcf Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 25 Apr 2013 12:12:23 +0200 Subject: [PATCH] (multi)udpsink: Use separate sockets for IPv4 and IPv6 https://bugzilla.gnome.org/show_bug.cgi?id=534243 --- gst/udp/gstmultiudpsink.c | 227 +++++++++++++++++++++++++++++++++++++--------- gst/udp/gstmultiudpsink.h | 4 +- 2 files changed, 185 insertions(+), 46 deletions(-) diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index dc51a10..c378ded 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -99,8 +99,10 @@ enum PROP_BYTES_TO_SERVE, PROP_BYTES_SERVED, PROP_SOCKET, + PROP_SOCKET_V6, PROP_CLOSE_SOCKET, PROP_USED_SOCKET, + PROP_USED_SOCKET_V6, PROP_CLIENTS, PROP_AUTO_MULTICAST, PROP_MULTICAST_IFACE, @@ -259,6 +261,10 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) g_param_spec_object ("socket", "Socket Handle", "Socket to use for UDP sending. (NULL == allocate)", G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SOCKET_V6, + g_param_spec_object ("socket-v6", "Socket Handle IPv6", + "Socket to use for UDPv6 sending. (NULL == allocate)", + G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET, g_param_spec_boolean ("close-socket", "Close socket", "Close socket if passed as property on state change", @@ -267,6 +273,10 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) g_param_spec_object ("used-socket", "Used Socket Handle", "Socket currently in use for UDP sending. (NULL == no socket)", G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6, + g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6", + "Socket currently in use for UDPv6 sending. (NULL == no socket)", + G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CLIENTS, g_param_spec_string ("clients", "Clients", "A comma separated list of host:port pairs with destinations", @@ -303,8 +313,8 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) */ g_object_class_install_property (gobject_class, PROP_FORCE_IPV4, g_param_spec_boolean ("force-ipv4", "Force IPv4", - "Forcing the use of an IPv4 socket", DEFAULT_FORCE_IPV4, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)", + DEFAULT_FORCE_IPV4, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP, g_param_spec_int ("qos-dscp", "QoS diff srv code point", @@ -359,7 +369,9 @@ gst_multiudpsink_init (GstMultiUDPSink * sink) g_mutex_init (&sink->client_lock); sink->socket = DEFAULT_SOCKET; + sink->socket_v6 = DEFAULT_SOCKET; sink->used_socket = DEFAULT_USED_SOCKET; + sink->used_socket_v6 = DEFAULT_USED_SOCKET; sink->close_socket = DEFAULT_CLOSE_SOCKET; sink->external_socket = (sink->socket != NULL); sink->auto_multicast = DEFAULT_AUTO_MULTICAST; @@ -461,10 +473,18 @@ gst_multiudpsink_finalize (GObject * object) g_object_unref (sink->socket); sink->socket = NULL; + if (sink->socket_v6) + g_object_unref (sink->socket_v6); + sink->socket_v6 = NULL; + if (sink->used_socket) g_object_unref (sink->used_socket); sink->used_socket = NULL; + if (sink->used_socket_v6) + g_object_unref (sink->used_socket_v6); + sink->used_socket_v6 = NULL; + if (sink->cancellable) g_object_unref (sink->cancellable); sink->cancellable = NULL; @@ -527,6 +547,8 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) num = 0; for (clients = sink->clients; clients; clients = g_list_next (clients)) { GstUDPClient *client; + GSocket *socket; + GSocketFamily family; gint count; client = (GstUDPClient *) clients->data; @@ -534,13 +556,20 @@ gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) GST_LOG_OBJECT (sink, "sending %" G_GSIZE_FORMAT " bytes to client %p", size, client); + family = g_socket_address_get_family (G_SOCKET_ADDRESS (client->addr)); + /* Select socket to send from for this address */ + if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket) + socket = sink->used_socket_v6; + else + socket = sink->used_socket; + count = sink->send_duplicates ? client->refcount : 1; while (count--) { gssize ret; ret = - g_socket_send_message (sink->used_socket, client->addr, vec, n_mem, + g_socket_send_message (socket, client->addr, vec, n_mem, NULL, 0, 0, sink->cancellable, &err); if (G_UNLIKELY (ret < 0)) { @@ -655,13 +684,13 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink) } static void -gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink) +gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket) { /* don't touch on -1 */ if (sink->qos_dscp < 0) return; - if (sink->used_socket == NULL) + if (socket == NULL) return; #ifdef IP_TOS @@ -669,7 +698,7 @@ gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink) gint tos; gint fd; - fd = g_socket_get_fd (sink->used_socket); + fd = g_socket_get_fd (socket); GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp); @@ -713,6 +742,23 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id, udpsink->socket = g_value_dup_object (value); GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket); break; + case PROP_SOCKET_V6: + if (udpsink->socket_v6 != NULL + && udpsink->socket_v6 != udpsink->used_socket_v6 + && udpsink->close_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->socket_v6, &err)) { + GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6, + err->message); + g_clear_error (&err); + } + } + if (udpsink->socket_v6) + g_object_unref (udpsink->socket_v6); + udpsink->socket_v6 = g_value_dup_object (value); + GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6); + break; case PROP_CLOSE_SOCKET: udpsink->close_socket = g_value_get_boolean (value); break; @@ -744,7 +790,8 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id, break; case PROP_QOS_DSCP: udpsink->qos_dscp = g_value_get_int (value); - gst_multiudpsink_setup_qos_dscp (udpsink); + gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket); + gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6); break; case PROP_SEND_DUPLICATES: udpsink->send_duplicates = g_value_get_boolean (value); @@ -825,25 +872,36 @@ gst_multiudpsink_configure_client (GstMultiUDPSink * sink, { GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr); GInetAddress *addr = g_inet_socket_address_get_address (saddr); + GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr)); + GSocket *socket; GError *err = NULL; GST_DEBUG_OBJECT (sink, "configuring client %p", client); + if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6) + goto invalid_family; + + /* Select socket to send from for this address */ + if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket) + socket = sink->used_socket_v6; + else + socket = sink->used_socket; + if (g_inet_address_get_is_multicast (addr)) { GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client); if (sink->auto_multicast) { GST_DEBUG_OBJECT (sink, "autojoining group"); - if (!g_socket_join_multicast_group (sink->used_socket, addr, FALSE, + if (!g_socket_join_multicast_group (socket, addr, FALSE, sink->multi_iface, &err)) goto join_group_failed; } GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop); - g_socket_set_multicast_loopback (sink->used_socket, sink->loop); + g_socket_set_multicast_loopback (socket, sink->loop); GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc); - g_socket_set_multicast_ttl (sink->used_socket, sink->ttl_mc); + g_socket_set_multicast_ttl (socket, sink->ttl_mc); } else { GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl); - g_socket_set_ttl (sink->used_socket, sink->ttl); + g_socket_set_ttl (socket, sink->ttl); } return TRUE; @@ -857,6 +915,13 @@ join_group_failed: g_clear_error (&err); return FALSE; } +invalid_family: + { + gst_multiudpsink_stop (GST_BASE_SINK (sink)); + GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), + ("Invalid address family (got %d)", family)); + return FALSE; + } } /* create a socket for sending to remote machine */ @@ -870,26 +935,47 @@ gst_multiudpsink_start (GstBaseSink * bsink) sink = GST_MULTIUDPSINK (bsink); - if (sink->socket == NULL) { - GST_DEBUG_OBJECT (sink, "creating sockets"); - /* create sender socket try IP6, fall back to IP4 */ - if (sink->force_ipv4 || (sink->used_socket = - g_socket_new (G_SOCKET_FAMILY_IPV6, - G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) { - if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, - G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) - goto no_socket; - } + sink->external_socket = FALSE; - GST_DEBUG_OBJECT (sink, "have socket"); - sink->external_socket = FALSE; - } else { + if (sink->socket) { GST_DEBUG_OBJECT (sink, "using configured socket"); - /* we use the configured socket */ - sink->used_socket = G_SOCKET (g_object_ref (sink->socket)); + if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) { + sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket)); + sink->external_socket = TRUE; + } else { + sink->used_socket = G_SOCKET (g_object_ref (sink->socket)); + sink->external_socket = TRUE; + } + } + + if (sink->socket_v6) { + GST_DEBUG_OBJECT (sink, "using configured IPv6 socket"); + g_return_val_if_fail (g_socket_get_family (sink->socket) != + G_SOCKET_FAMILY_IPV6, FALSE); + + if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) { + GST_ERROR_OBJECT (sink, + "Provided different IPv6 sockets in socket and socket-v6 properties"); + return FALSE; + } + + sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6)); sink->external_socket = TRUE; } + if (!sink->used_socket && !sink->used_socket_v6) { + /* create sender sockets if none available */ + + if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) + goto no_socket; + + if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) { + GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s", err->message); + g_clear_error (&err); + } + } #ifdef SO_SNDBUF { socklen_t len; @@ -903,35 +989,65 @@ gst_multiudpsink_start (GstBaseSink * bsink) /* set buffer size, Note that on Linux this is typically limited to a * maximum of around 100K. Also a minimum of 128 bytes is required on * Linux. */ - ret = - setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, - SO_SNDBUF, (void *) &sndsize, len); - if (ret != 0) { - GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), - ("Could not create a buffer of requested %d bytes, %d: %s", - sndsize, ret, g_strerror (errno))); + + if (sink->used_socket) { + ret = + setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, + SO_SNDBUF, (void *) &sndsize, len); + if (ret != 0) { + GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), + ("Could not create a buffer of requested %d bytes, %d: %s", + sndsize, ret, g_strerror (errno))); + } + } + + if (sink->used_socket_v6) { + ret = + setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET, + SO_SNDBUF, (void *) &sndsize, len); + if (ret != 0) { + GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), + ("Could not create a buffer of requested %d bytes, %d: %s", + sndsize, ret, g_strerror (errno))); + } } } /* read the value of the receive buffer. Note that on linux this returns 2x the * value we set because the kernel allocates extra memory for metadata. * The default on Linux is about 100K (which is about 50K without metadata) */ - ret = - getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, SO_SNDBUF, - (void *) &sndsize, &len); - if (ret == 0) - GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize); - else - GST_DEBUG_OBJECT (sink, "could not get udp buffer size"); + if (sink->used_socket) { + ret = + getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, + SO_SNDBUF, (void *) &sndsize, &len); + if (ret == 0) + GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize); + else + GST_DEBUG_OBJECT (sink, "could not get UDP buffer size"); + } + + if (sink->used_socket_v6) { + ret = + getsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET, + SO_SNDBUF, (void *) &sndsize, &len); + if (ret == 0) + GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize); + else + GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size"); + } } #endif - g_socket_set_broadcast (sink->used_socket, TRUE); + if (sink->used_socket) + g_socket_set_broadcast (sink->used_socket, TRUE); + if (sink->used_socket_v6) + g_socket_set_broadcast (sink->used_socket_v6, TRUE); sink->bytes_to_serve = 0; sink->bytes_served = 0; - gst_multiudpsink_setup_qos_dscp (sink); + gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket); + gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6); /* look for multicast clients and join multicast groups appropriately set also ttl and multicast loopback delivery appropriately */ @@ -974,6 +1090,20 @@ gst_multiudpsink_stop (GstBaseSink * bsink) udpsink->used_socket = NULL; } + if (udpsink->used_socket_v6) { + if (udpsink->close_socket || !udpsink->external_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->used_socket_v6, &err)) { + GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + } + + g_object_unref (udpsink->used_socket_v6); + udpsink->used_socket_v6 = NULL; + } + return TRUE; } @@ -1069,17 +1199,26 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) if (client->refcount == 0) { GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr); GInetAddress *addr = g_inet_socket_address_get_address (saddr); + GSocketFamily family = + g_socket_address_get_family (G_SOCKET_ADDRESS (saddr)); + GSocket *socket; + + /* Select socket to send from for this address */ + if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket) + socket = sink->used_socket_v6; + else + socket = sink->used_socket; GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port); g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); - if (sink->used_socket && sink->auto_multicast + if (socket && sink->auto_multicast && g_inet_address_get_is_multicast (addr)) { GError *err = NULL; - if (!g_socket_leave_multicast_group (sink->used_socket, addr, FALSE, + if (!g_socket_leave_multicast_group (socket, addr, FALSE, sink->multi_iface, &err)) { GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s", err->message); diff --git a/gst/udp/gstmultiudpsink.h b/gst/udp/gstmultiudpsink.h index a7f6b0b..cb84571 100644 --- a/gst/udp/gstmultiudpsink.h +++ b/gst/udp/gstmultiudpsink.h @@ -56,7 +56,7 @@ typedef struct { struct _GstMultiUDPSink { GstBaseSink parent; - GSocket *used_socket; + GSocket *used_socket, *used_socket_v6; GCancellable *cancellable; GMutex client_lock; @@ -68,7 +68,7 @@ struct _GstMultiUDPSink { /* properties */ guint64 bytes_to_serve; guint64 bytes_served; - GSocket *socket; + GSocket *socket, *socket_v6; gboolean close_socket; gboolean external_socket; -- 2.7.4