X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fudp%2Fgstmultiudpsink.c;h=7f4f03040f4dc90655611b312064bb810b1a2f4c;hb=3d61d12e03f8c661552454c351e40b1c2da95470;hp=5ee6d516ce965b3defd1544ed07570917b47049f;hpb=33f18b8ea49f58367402abb9099abaa03b34f93c;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index 5ee6d51..7f4f030 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -1,6 +1,8 @@ /* GStreamer * Copyright (C) <2007> Wim Taymans * Copyright (C) <2009> Jarkko Palviainen + * Copyright (C) <2012> Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -27,19 +29,22 @@ * It can be combined with rtp payload encoders to implement RTP streaming. */ +/* FIXME 0.11: suppress warnings for deprecated API such as GValueArray + * with newer GLib versions (>= 2.31.0) */ +#define GLIB_DISABLE_DEPRECATION_WARNINGS + #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "gstudp-marshal.h" #include "gstmultiudpsink.h" -#include -#include -#ifdef HAVE_UNISTD_H -#include -#endif -#include #include +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#include "gst/glib-compat-private.h" GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug); #define GST_CAT_DEFAULT (multiudpsink_debug) @@ -68,11 +73,11 @@ enum LAST_SIGNAL }; -#define DEFAULT_SOCKFD -1 -#define DEFAULT_CLOSEFD TRUE -#define DEFAULT_SOCK -1 +#define DEFAULT_SOCKET NULL +#define DEFAULT_CLOSE_SOCKET TRUE +#define DEFAULT_USED_SOCKET NULL #define DEFAULT_CLIENTS NULL -#define DEFAULT_FAMILY 0 +#define DEFAULT_FAMILY G_SOCKET_FAMILY_IPV6 /* FIXME, this should be disabled by default, we don't need to join a multicast * group for sending, if this socket is also used for receiving, it should * be configured in the element that does the receive. */ @@ -89,9 +94,9 @@ enum PROP_0, PROP_BYTES_TO_SERVE, PROP_BYTES_SERVED, - PROP_SOCKFD, - PROP_CLOSEFD, - PROP_SOCK, + PROP_SOCKET, + PROP_CLOSE_SOCKET, + PROP_USED_SOCKET, PROP_CLIENTS, PROP_AUTO_MULTICAST, PROP_TTL, @@ -103,28 +108,15 @@ enum PROP_LAST }; -#define CLOSE_IF_REQUESTED(udpctx) \ -G_STMT_START { \ - if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \ - CLOSE_SOCKET(udpctx->sock); \ - if (udpctx->sock == udpctx->sockfd) \ - udpctx->sockfd = DEFAULT_SOCKFD; \ - } \ - udpctx->sock = DEFAULT_SOCK; \ -} G_STMT_END - static void gst_multiudpsink_finalize (GObject * object); static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink, GstBuffer * buffer); -#if 0 -#ifndef G_OS_WIN32 /* sendmsg() is not available on Windows */ -static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink, - GstBufferList * list); -#endif -#endif -static GstStateChangeReturn gst_multiudpsink_change_state (GstElement * - element, GstStateChange transition); + +static gboolean gst_multiudpsink_start (GstBaseSink * bsink); +static gboolean gst_multiudpsink_stop (GstBaseSink * bsink); +static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink); +static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink); static void gst_multiudpsink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -152,8 +144,6 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) gstelement_class = (GstElementClass *) klass; gstbasesink_class = (GstBaseSinkClass *) klass; - parent_class = g_type_class_peek_parent (klass); - gobject_class->set_property = gst_multiudpsink_set_property; gobject_class->get_property = gst_multiudpsink_get_property; gobject_class->finalize = gst_multiudpsink_finalize; @@ -213,14 +203,14 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) * * Get the statistics of the client with destination @host and @port. * - * Returns: a GValueArray of uint64: bytes_sent, packets_sent, + * Returns: a GstStructure: bytes_sent, packets_sent, * connect_time (in epoch seconds), disconnect_time (in epoch seconds) */ gst_multiudpsink_signals[SIGNAL_GET_STATS] = g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats), - NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2, + NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, GST_TYPE_STRUCTURE, 2, G_TYPE_STRING, G_TYPE_INT); /** * GstMultiUDPSink::client-added: @@ -257,22 +247,20 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED, g_param_spec_uint64 ("bytes-served", "Bytes served", - "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SOCKFD, - g_param_spec_int ("sockfd", "Socket Handle", - "Socket to use for UDP sending. (-1 == allocate)", - -1, G_MAXINT, DEFAULT_SOCKFD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_CLOSEFD, - g_param_spec_boolean ("closefd", "Close sockfd", - "Close sockfd if passed as property on state change", - DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SOCK, - g_param_spec_int ("sock", "Socket Handle", - "Socket currently in use for UDP sending. (-1 == no socket)", - -1, G_MAXINT, DEFAULT_SOCK, + "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SOCKET, + g_param_spec_object ("socket", "Socket Handle", + "Socket to use for UDP sending. (NULL == allocate)", + G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET, + g_param_spec_boolean ("close-socket", "Close socket", + "Close socket if passed as property on state change", + DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_USED_SOCKET, + g_param_spec_object ("used-socket", "Used Socket Handle", + "Socket currently in use for UDP sending. (NULL == no socket)", + G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CLIENTS, g_param_spec_string ("clients", "Clients", "A comma separated list of host:port pairs with destinations", @@ -320,8 +308,6 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT, DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - gstelement_class->change_state = gst_multiudpsink_change_state; - gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sink_template)); @@ -331,11 +317,10 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) "Wim Taymans "); gstbasesink_class->render = gst_multiudpsink_render; -#if 0 -#ifndef G_OS_WIN32 - gstbasesink_class->render_list = gst_multiudpsink_render_list; -#endif -#endif + gstbasesink_class->start = gst_multiudpsink_start; + gstbasesink_class->stop = gst_multiudpsink_stop; + gstbasesink_class->unlock = gst_multiudpsink_unlock; + gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop; klass->add = gst_multiudpsink_add; klass->remove = gst_multiudpsink_remove; klass->clear = gst_multiudpsink_clear; @@ -348,38 +333,74 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) static void gst_multiudpsink_init (GstMultiUDPSink * sink) { - WSA_STARTUP (sink); - - sink->client_lock = g_mutex_new (); - sink->sock = DEFAULT_SOCK; - sink->sockfd = DEFAULT_SOCKFD; - sink->closefd = DEFAULT_CLOSEFD; - sink->externalfd = (sink->sockfd != -1); + g_mutex_init (&sink->client_lock); + sink->socket = DEFAULT_SOCKET; + sink->used_socket = DEFAULT_USED_SOCKET; + sink->close_socket = DEFAULT_CLOSE_SOCKET; + sink->external_socket = (sink->socket != NULL); sink->auto_multicast = DEFAULT_AUTO_MULTICAST; sink->ttl = DEFAULT_TTL; sink->ttl_mc = DEFAULT_TTL_MC; sink->loop = DEFAULT_LOOP; sink->qos_dscp = DEFAULT_QOS_DSCP; - sink->ss_family = DEFAULT_FAMILY; + sink->family = DEFAULT_FAMILY; sink->send_duplicates = DEFAULT_SEND_DUPLICATES; + + sink->cancellable = g_cancellable_new (); } static GstUDPClient * create_client (GstMultiUDPSink * sink, const gchar * host, gint port) { GstUDPClient *client; + GInetAddress *addr; + GResolver *resolver; + GError *err = NULL; + + addr = g_inet_address_new_from_string (host); + if (!addr) { + GList *results; + + resolver = g_resolver_get_default (); + results = + g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err); + if (!results) + goto name_resolve; + addr = G_INET_ADDRESS (g_object_ref (results->data)); + + g_resolver_free_addresses (results); + g_object_unref (resolver); + } +#ifndef GST_DISABLE_GST_DEBUG + { + gchar *ip = g_inet_address_to_string (addr); + + GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip); + g_free (ip); + } +#endif client = g_slice_new0 (GstUDPClient); client->refcount = 1; client->host = g_strdup (host); client->port = port; + client->addr = g_inet_socket_address_new (addr, port); + g_object_unref (addr); return client; + +name_resolve: + { + g_object_unref (resolver); + + return NULL; + } } static void free_client (GstUDPClient * client) { + g_object_unref (client->addr); g_free (client->host); g_slice_free (GstUDPClient, client); } @@ -403,240 +424,110 @@ gst_multiudpsink_finalize (GObject * object) g_list_foreach (sink->clients, (GFunc) free_client, NULL); g_list_free (sink->clients); - if (sink->sockfd >= 0 && sink->closefd) - CLOSE_SOCKET (sink->sockfd); - - g_mutex_free (sink->client_lock); + if (sink->socket) + g_object_unref (sink->socket); + sink->socket = NULL; - WSA_CLEANUP (object); + if (sink->used_socket) + g_object_unref (sink->used_socket); + sink->used_socket = NULL; - G_OBJECT_CLASS (parent_class)->finalize (object); -} + if (sink->cancellable) + g_object_unref (sink->cancellable); + sink->cancellable = NULL; -static gboolean -socket_error_is_ignorable (void) -{ -#ifdef G_OS_WIN32 - /* Windows doesn't seem to have an EAGAIN for sockets */ - return WSAGetLastError () == WSAEINTR; -#else - return errno == EINTR || errno == EAGAIN; -#endif -} + g_mutex_clear (&sink->client_lock); -static int -socket_last_error_code (void) -{ -#ifdef G_OS_WIN32 - return WSAGetLastError (); -#else - return errno; -#endif -} - -static gchar * -socket_last_error_message (void) -{ -#ifdef G_OS_WIN32 - int errorcode = WSAGetLastError (); - wchar_t buf[1024]; - DWORD result = - FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, errorcode, 0, (LPSTR) buf, sizeof (buf) / sizeof (wchar_t), NULL); - if (FAILED (result)) { - return g_strdup ("failed to get error message from system"); - } else { - gchar *res = - g_convert ((gchar *) buf, -1, "UTF-16", "UTF-8", NULL, NULL, NULL); - /* g_convert() internally calls windows functions which reset the - windows error code, so fix it up again like this */ - WSASetLastError (errorcode); - return res; - } -#else - return g_strdup (g_strerror (errno)); -#endif + G_OBJECT_CLASS (parent_class)->finalize (object); } -#ifdef G_OS_WIN32 -/* version without sendmsg */ -static GstFlowReturn -gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) -{ - GstMultiUDPSink *sink; - gint ret, num = 0, no_clients = 0; - gsize size; - guint8 *data; - GList *clients; - gint len; - - sink = GST_MULTIUDPSINK (bsink); - - data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ); - - if (size > UDP_MAX_SIZE) { - GST_WARNING ("Attempting to send a UDP packet larger than maximum " - "size (%d > %d)", size, UDP_MAX_SIZE); - } - - sink->bytes_to_serve += size; - - /* grab lock while iterating and sending to clients, this should be - * fast as UDP never blocks */ - g_mutex_lock (sink->client_lock); - GST_LOG_OBJECT (bsink, "about to send %d bytes", size); - - for (clients = sink->clients; clients; clients = g_list_next (clients)) { - GstUDPClient *client; - gint count; - - client = (GstUDPClient *) clients->data; - no_clients++; - GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); - - count = sink->send_duplicates ? client->refcount : 1; - - while (count--) { - while (TRUE) { - len = gst_udp_get_sockaddr_length (&client->theiraddr); - - ret = sendto (*client->sock, -#ifdef G_OS_WIN32 - (char *) data, -#else - data, -#endif - size, 0, (struct sockaddr *) &client->theiraddr, len); - - if (ret < 0) { - /* some error, just warn, it's likely recoverable and we don't want to - * break streaming. We break so that we stop retrying for this client. */ - if (!socket_error_is_ignorable ()) { - gchar *errormessage = socket_last_error_message (); - GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, - socket_last_error_code (), errormessage); - g_free (errormessage); - break; - } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; - break; - } - } - } - } - g_mutex_unlock (sink->client_lock); - - gst_buffer_unmap (buffer, data, size); - - GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num, - no_clients); - - return GST_FLOW_OK; -} -#else /* !G_OS_WIN32 */ -/* version with sendmsg */ static GstFlowReturn gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstMultiUDPSink *sink; GList *clients; - gint ret, size = 0, num = 0, no_clients = 0; - struct iovec *iov; - struct msghdr msg = { 0 }; + GOutputVector *vec; + GstMapInfo *map; guint n_mem, i; - gpointer bdata; - gsize bsize; + gsize size; GstMemory *mem; + gint num, no_clients; + GError *err = NULL; sink = GST_MULTIUDPSINK (bsink); - msg.msg_iovlen = 0; - size = 0; - n_mem = gst_buffer_n_memory (buffer); if (n_mem == 0) goto no_data; - iov = (struct iovec *) g_malloc (n_mem * sizeof (struct iovec)); - msg.msg_iov = iov; + vec = g_new (GOutputVector, n_mem); + map = g_new (GstMapInfo, n_mem); + size = 0; for (i = 0; i < n_mem; i++) { - mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ); - bdata = gst_memory_map (mem, &bsize, NULL, GST_MAP_READ); + mem = gst_buffer_get_memory (buffer, i); + gst_memory_map (mem, &map[i], GST_MAP_READ); - if (bsize > UDP_MAX_SIZE) { + if (map[i].size > UDP_MAX_SIZE) { GST_WARNING ("Attempting to send a UDP packet larger than maximum " - "size (%d > %d)", bsize, UDP_MAX_SIZE); + "size (%" G_GSIZE_FORMAT " > %d)", map[i].size, UDP_MAX_SIZE); } - msg.msg_iov[msg.msg_iovlen].iov_len = bsize; - msg.msg_iov[msg.msg_iovlen].iov_base = bdata; - msg.msg_iovlen++; + vec[i].buffer = map[i].data; + vec[i].size = map[i].size; - size += bsize; + size += map[i].size; } sink->bytes_to_serve += size; /* grab lock while iterating and sending to clients, this should be * fast as UDP never blocks */ - g_mutex_lock (sink->client_lock); - GST_LOG_OBJECT (bsink, "about to send %d bytes", size); + g_mutex_lock (&sink->client_lock); + GST_LOG_OBJECT (bsink, "about to send %" G_GSIZE_FORMAT " bytes", size); + no_clients = 0; + num = 0; for (clients = sink->clients; clients; clients = g_list_next (clients)) { GstUDPClient *client; gint count; client = (GstUDPClient *) clients->data; no_clients++; - GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client); + GST_LOG_OBJECT (sink, "sending %" G_GSIZE_FORMAT " bytes to client %p", + size, client); count = sink->send_duplicates ? client->refcount : 1; while (count--) { - while (TRUE) { - msg.msg_name = (void *) &client->theiraddr; - msg.msg_namelen = sizeof (client->theiraddr); - ret = sendmsg (*client->sock, &msg, 0); - - if (ret < 0) { - if (!socket_error_is_ignorable ()) { - gchar *errormessage = socket_last_error_message (); - GST_WARNING_OBJECT (sink, "client %p gave error %d (%s)", client, - socket_last_error_code (), errormessage); - g_free (errormessage); - break; - break; - } - } else { - num++; - client->bytes_sent += ret; - client->packets_sent++; - sink->bytes_served += ret; - break; - } - } + gssize ret; + + ret = + g_socket_send_message (sink->used_socket, client->addr, vec, n_mem, + NULL, 0, 0, sink->cancellable, &err); + + if (ret < 0) + goto send_error; + + num++; + client->bytes_sent += ret; + client->packets_sent++; + sink->bytes_served += ret; } } - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); /* unmap all memory again */ for (i = 0; i < n_mem; i++) { - mem = gst_buffer_peek_memory (buffer, i, GST_MAP_READ); - - bsize = msg.msg_iov[i].iov_len; - bdata = msg.msg_iov[i].iov_base; - - gst_memory_unmap (mem, bdata, bsize); + gst_memory_unmap (map[i].memory, &map[i]); + gst_memory_unref (map[i].memory); } - g_free (iov); - GST_LOG_OBJECT (sink, "sent %d bytes to %d (of %d) clients", size, num, - no_clients); + g_free (vec); + g_free (map); + + GST_LOG_OBJECT (sink, "sent %" G_GSIZE_FORMAT " bytes to %d (of %d) clients", + size, num, no_clients); return GST_FLOW_OK; @@ -644,17 +535,14 @@ no_data: { return GST_FLOW_OK; } +send_error: + { + g_mutex_unlock (&sink->client_lock); + GST_DEBUG ("got send error %s", err->message); + g_clear_error (&err); + return GST_FLOW_ERROR; + } } -#endif - -#if 0 -/* DISABLED, core sends buffers to our render one by one, we can't really do - * much better */ -static GstFlowReturn -gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * list) -{ -} -#endif static void gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink, @@ -665,23 +553,23 @@ gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink, clients = g_strsplit (string, ",", 0); - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); /* clear all existing clients */ gst_multiudpsink_clear_internal (sink, FALSE); for (i = 0; clients[i]; i++) { gchar *host, *p; - gint port = 0; + gint64 port = 0; host = clients[i]; p = strstr (clients[i], ":"); if (p != NULL) { *p = '\0'; - port = atoi (p + 1); + port = g_ascii_strtoll (p + 1, NULL, 10); } if (port != 0) gst_multiudpsink_add_internal (sink, host, port, FALSE); } - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); g_strfreev (clients); } @@ -694,7 +582,7 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink) str = g_string_new (""); - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); clients = sink->clients; while (clients) { GstUDPClient *client; @@ -710,7 +598,7 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink) (clients || count > 1 ? "," : "")); } } - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); return g_string_free (str, FALSE); } @@ -718,33 +606,35 @@ gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink) static void gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink) { - gint tos; - /* don't touch on -1 */ if (sink->qos_dscp < 0) return; - if (sink->sock < 0) + if (sink->used_socket == NULL) return; - GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp); +#ifdef IP_TOS + { + gint tos; + gint fd; + + fd = g_socket_get_fd (sink->used_socket); - /* Extract and shift 6 bits of DSFIELD */ - tos = (sink->qos_dscp & 0x3f) << 2; + GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp); - if (setsockopt (sink->sock, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) { - gchar *errormessage = socket_last_error_message (); - GST_ERROR_OBJECT (sink, "could not set TOS: %s", errormessage); - g_free (errormessage); - } + /* Extract and shift 6 bits of DSFIELD */ + tos = (sink->qos_dscp & 0x3f) << 2; + + if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) { + GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno)); + } #ifdef IPV6_TCLASS - if (setsockopt (sink->sock, IPPROTO_IPV6, IPV6_TCLASS, &tos, - sizeof (tos)) < 0) { - gchar *errormessage = socket_last_error_message (); - GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", errormessage); - g_free (errormessage); + if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0) { + GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno)); + } } #endif +#endif } static void @@ -756,15 +646,24 @@ gst_multiudpsink_set_property (GObject * object, guint prop_id, udpsink = GST_MULTIUDPSINK (object); switch (prop_id) { - case PROP_SOCKFD: - if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock && - udpsink->closefd) - CLOSE_SOCKET (udpsink->sockfd); - udpsink->sockfd = g_value_get_int (value); - GST_DEBUG_OBJECT (udpsink, "setting SOCKFD to %d", udpsink->sockfd); + case PROP_SOCKET: + if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket && + udpsink->close_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->socket, &err)) { + GST_ERROR ("failed to close socket %p: %s", udpsink->socket, + err->message); + g_clear_error (&err); + } + } + if (udpsink->socket) + g_object_unref (udpsink->socket); + udpsink->socket = g_value_dup_object (value); + GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket); break; - case PROP_CLOSEFD: - udpsink->closefd = g_value_get_boolean (value); + case PROP_CLOSE_SOCKET: + udpsink->close_socket = g_value_get_boolean (value); break; case PROP_CLIENTS: gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value)); @@ -812,14 +711,14 @@ gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_BYTES_SERVED: g_value_set_uint64 (value, udpsink->bytes_served); break; - case PROP_SOCKFD: - g_value_set_int (value, udpsink->sockfd); + case PROP_SOCKET: + g_value_set_object (value, udpsink->socket); break; - case PROP_CLOSEFD: - g_value_set_boolean (value, udpsink->closefd); + case PROP_CLOSE_SOCKET: + g_value_set_boolean (value, udpsink->close_socket); break; - case PROP_SOCK: - g_value_set_int (value, udpsink->sock); + case PROP_USED_SOCKET: + g_value_set_object (value, udpsink->used_socket); break; case PROP_CLIENTS: g_value_take_string (value, @@ -856,133 +755,112 @@ static gboolean gst_multiudpsink_configure_client (GstMultiUDPSink * sink, GstUDPClient * client) { + GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr); + GInetAddress *addr = g_inet_socket_address_get_address (saddr); + GError *err = NULL; + GST_DEBUG_OBJECT (sink, "configuring client %p", client); - if (gst_udp_is_multicast (&client->theiraddr)) { + if (g_inet_address_get_is_multicast (addr)) { GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client); if (sink->auto_multicast) { GST_DEBUG_OBJECT (sink, "autojoining group"); - if (gst_udp_join_group (*(client->sock), &client->theiraddr, NULL) - != 0) + if (!g_socket_join_multicast_group (sink->used_socket, addr, FALSE, NULL, + &err)) goto join_group_failed; } GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop); - if (gst_udp_set_loop (sink->sock, sink->ss_family, sink->loop) != 0) - goto loop_failed; + g_socket_set_multicast_loopback (sink->used_socket, sink->loop); GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc); - if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl_mc, TRUE) != 0) - goto ttl_failed; + g_socket_set_multicast_ttl (sink->used_socket, sink->ttl_mc); } else { GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl); - if (gst_udp_set_ttl (sink->sock, sink->ss_family, sink->ttl, FALSE) != 0) - goto ttl_failed; + g_socket_set_ttl (sink->used_socket, sink->ttl); } return TRUE; /* ERRORS */ join_group_failed: { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); - GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not join multicast group (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -ttl_failed: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); - GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not set TTL socket option (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -loop_failed: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); + gst_multiudpsink_stop (GST_BASE_SINK (sink)); GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not set loopback socket option (%d): %s", - errorcode, errormessage)); - g_free (errormessage); + ("Could not join multicast group: %s", err->message)); + g_clear_error (&err); return FALSE; } } /* create a socket for sending to remote machine */ static gboolean -gst_multiudpsink_init_send (GstMultiUDPSink * sink) +gst_multiudpsink_start (GstBaseSink * bsink) { - guint bc_val; + GstMultiUDPSink *sink; GList *clients; GstUDPClient *client; - int sndsize, ret; - socklen_t len; + GError *err = NULL; + + sink = GST_MULTIUDPSINK (bsink); - if (sink->sockfd == -1) { + if (sink->socket == NULL) { GST_DEBUG_OBJECT (sink, "creating sockets"); /* create sender socket try IP6, fall back to IP4 */ - sink->ss_family = AF_INET6; - if ((sink->sock = socket (AF_INET6, SOCK_DGRAM, 0)) == -1) { - sink->ss_family = AF_INET; - if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) + sink->family = G_SOCKET_FAMILY_IPV6; + if ((sink->used_socket = + g_socket_new (G_SOCKET_FAMILY_IPV6, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) { + sink->family = G_SOCKET_FAMILY_IPV4; + if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4, + G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL) goto no_socket; } GST_DEBUG_OBJECT (sink, "have socket"); - sink->externalfd = FALSE; + sink->external_socket = FALSE; } else { - struct sockaddr_storage myaddr; - GST_DEBUG_OBJECT (sink, "using configured socket"); - /* we use the configured socket, try to get some info about it */ - len = sizeof (myaddr); - if (getsockname (sink->sockfd, (struct sockaddr *) &myaddr, &len) < 0) - goto getsockname_error; - - sink->ss_family = myaddr.ss_family; /* we use the configured socket */ - sink->sock = sink->sockfd; - sink->externalfd = TRUE; + sink->used_socket = G_SOCKET (g_object_ref (sink->socket)); + sink->family = g_socket_get_family (sink->used_socket); + sink->external_socket = TRUE; } - len = sizeof (sndsize); - if (sink->buffer_size != 0) { - sndsize = sink->buffer_size; +#ifdef SO_SNDBUF + { + socklen_t len; + gint sndsize, ret; + + len = sizeof (sndsize); + if (sink->buffer_size != 0) { + sndsize = sink->buffer_size; + + GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize); + /* set buffer size, Note that on Linux this is typically limited to a + * maximum of around 100K. Also a minimum of 128 bytes is required on + * Linux. */ + ret = + setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, + SO_SNDBUF, (void *) &sndsize, len); + if (ret != 0) { + GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), + ("Could not create a buffer of requested %d bytes, %d: %s", + sndsize, ret, g_strerror (errno))); + } + } - GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize); - /* set buffer size, Note that on Linux this is typically limited to a - * maximum of around 100K. Also a minimum of 128 bytes is required on - * Linux. */ + /* read the value of the receive buffer. Note that on linux this returns 2x the + * value we set because the kernel allocates extra memory for metadata. + * The default on Linux is about 100K (which is about 50K without metadata) */ ret = - setsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize, - len); - if (ret != 0) { - GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL), - ("Could not create a buffer of requested %d bytes, %d: %s (%d)", - sndsize, ret, g_strerror (errno), errno)); - } + getsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET, SO_SNDBUF, + (void *) &sndsize, &len); + if (ret == 0) + GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize); + else + GST_DEBUG_OBJECT (sink, "could not get udp buffer size"); } +#endif - /* read the value of the receive buffer. Note that on linux this returns 2x the - * value we set because the kernel allocates extra memory for metadata. - * The default on Linux is about 100K (which is about 50K without metadata) */ - ret = - getsockopt (sink->sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &sndsize, &len); - if (ret == 0) - GST_DEBUG_OBJECT (sink, "have udp buffer of %d bytes", sndsize); - else - GST_DEBUG_OBJECT (sink, "could not get udp buffer size"); - - - bc_val = 1; - if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, - sizeof (bc_val)) < 0) - goto no_broadcast; + g_socket_set_broadcast (sink->used_socket, TRUE); sink->bytes_to_serve = 0; sink->bytes_served = 0; @@ -1002,39 +880,35 @@ gst_multiudpsink_init_send (GstMultiUDPSink * sink) /* ERRORS */ no_socket: { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL), - ("Could not create socket (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -getsockname_error: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL), - ("Could not getsockname (%d): %s", errorcode, errormessage)); - g_free (errormessage); - return FALSE; - } -no_broadcast: - { - gchar *errormessage = socket_last_error_message (); - int errorcode = socket_last_error_code (); - CLOSE_IF_REQUESTED (sink); - GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL), - ("Could not set broadcast socket option (%d): %s", - errorcode, errormessage)); - g_free (errormessage); + ("Could not create socket: %s", err->message)); + g_clear_error (&err); return FALSE; } } -static void -gst_multiudpsink_close (GstMultiUDPSink * sink) +static gboolean +gst_multiudpsink_stop (GstBaseSink * bsink) { - CLOSE_IF_REQUESTED (sink); + GstMultiUDPSink *udpsink; + + udpsink = GST_MULTIUDPSINK (bsink); + + if (udpsink->used_socket) { + if (udpsink->close_socket || !udpsink->external_socket) { + GError *err = NULL; + + if (!g_socket_close (udpsink->used_socket, &err)) { + GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message); + g_clear_error (&err); + } + } + + g_object_unref (udpsink->used_socket); + udpsink->used_socket = NULL; + } + + return TRUE; } static void @@ -1052,7 +926,7 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host, GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port); if (lock) - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); find = g_list_find_custom (sink->clients, &udpclient, (GCompareFunc) client_compare); @@ -1064,25 +938,21 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host, client->refcount++; } else { client = create_client (sink, host, port); - - client->sock = &sink->sock; - - if (gst_udp_get_addr (host, port, &client->theiraddr) < 0) - goto getaddrinfo_error; + if (!client) + goto error; g_get_current_time (&now); client->connect_time = GST_TIMEVAL_TO_TIME (now); - if (*client->sock > 0) { + if (sink->used_socket) gst_multiudpsink_configure_client (sink, client); - } GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port); sink->clients = g_list_prepend (sink->clients, client); } if (lock) - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); g_signal_emit (G_OBJECT (sink), gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port); @@ -1091,14 +961,12 @@ gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host, return; /* ERRORS */ -getaddrinfo_error: +error: { GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host, port); - GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?"); - free_client (client); if (lock) - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); return; } } @@ -1120,7 +988,7 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) udpclient.host = (gchar *) host; udpclient.port = port; - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); find = g_list_find_custom (sink->clients, &udpclient, (GCompareFunc) client_compare); if (!find) @@ -1133,33 +1001,44 @@ gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port) client->refcount--; if (client->refcount == 0) { + GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr); + GInetAddress *addr = g_inet_socket_address_get_address (saddr); + GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port); g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); - if (*(client->sock) != -1 && sink->auto_multicast - && gst_udp_is_multicast (&client->theiraddr)) - gst_udp_leave_group (*(client->sock), &client->theiraddr); + if (sink->used_socket && sink->auto_multicast + && g_inet_address_get_is_multicast (addr)) { + GError *err = NULL; + + if (!g_socket_leave_multicast_group (sink->used_socket, addr, FALSE, NULL, + &err)) { + GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s", + err->message); + g_clear_error (&err); + } + } /* Unlock to emit signal before we delete the actual client */ - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); g_signal_emit (G_OBJECT (sink), gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port); - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); sink->clients = g_list_delete_link (sink->clients, find); free_client (client); } - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); return; /* ERRORS */ not_found: { - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); GST_WARNING_OBJECT (sink, "client at host %s, port %d not found", host, port); return; @@ -1173,12 +1052,12 @@ gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock) /* we only need to remove the client structure, there is no additional * socket or anything to free for UDP */ if (lock) - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); g_list_foreach (sink->clients, (GFunc) free_client, sink); g_list_free (sink->clients); sink->clients = NULL; if (lock) - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); } void @@ -1187,20 +1066,19 @@ gst_multiudpsink_clear (GstMultiUDPSink * sink) gst_multiudpsink_clear_internal (sink, TRUE); } -GValueArray * +GstStructure * gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host, gint port) { GstUDPClient *client; - GValueArray *result = NULL; + GstStructure *result = NULL; GstUDPClient udpclient; GList *find; - GValue value = { 0 }; udpclient.host = (gchar *) host; udpclient.port = port; - g_mutex_lock (sink->client_lock); + g_mutex_lock (&sink->client_lock); find = g_list_find_custom (sink->clients, &udpclient, (GCompareFunc) client_compare); @@ -1211,78 +1089,50 @@ gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host, client = (GstUDPClient *) find->data; - /* Result is a value array of (bytes_sent, packets_sent, - * connect_time, disconnect_time), all as uint64 */ - result = g_value_array_new (4); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->bytes_sent); - result = g_value_array_append (result, &value); - g_value_unset (&value); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->packets_sent); - result = g_value_array_append (result, &value); - g_value_unset (&value); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->connect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); + result = gst_structure_new_empty ("multiudpsink-stats"); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->disconnect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); + gst_structure_set (result, + "bytes-sent", G_TYPE_UINT64, client->bytes_sent, + "packets-sent", G_TYPE_UINT64, client->packets_sent, + "connect-time", G_TYPE_UINT64, client->connect_time, + "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL); - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); return result; /* ERRORS */ not_found: { - g_mutex_unlock (sink->client_lock); + g_mutex_unlock (&sink->client_lock); GST_WARNING_OBJECT (sink, "client with host %s, port %d not found", host, port); /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may * confuse/break python bindings */ - return g_value_array_new (0); + return gst_structure_new_empty ("multiudpsink-stats"); } } -static GstStateChangeReturn -gst_multiudpsink_change_state (GstElement * element, GstStateChange transition) +static gboolean +gst_multiudpsink_unlock (GstBaseSink * bsink) { - GstStateChangeReturn ret; GstMultiUDPSink *sink; - sink = GST_MULTIUDPSINK (element); + sink = GST_MULTIUDPSINK (bsink); - switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: - if (!gst_multiudpsink_init_send (sink)) - goto no_init; - break; - default: - break; - } + g_cancellable_cancel (sink->cancellable); - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + return TRUE; +} - switch (transition) { - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_multiudpsink_close (sink); - break; - default: - break; - } - return ret; +static gboolean +gst_multiudpsink_unlock_stop (GstBaseSink * bsink) +{ + GstMultiUDPSink *sink; - /* ERRORS */ -no_init: - { - /* _init_send() posted specific error already */ - return GST_STATE_CHANGE_FAILURE; - } + sink = GST_MULTIUDPSINK (bsink); + + g_cancellable_reset (sink->cancellable); + + return TRUE; }