/* GStreamer
* Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
* Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
+ * Copyright (C) <2012> Collabora Ltd.
+ * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
#endif
#include "gstudpsrc.h"
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#include <stdlib.h>
-
-#if defined _MSC_VER && (_MSC_VER >= 1400)
-#include <io.h>
-#endif
#include <gst/net/gstnetaddressmeta.h>
-
-#ifdef HAVE_FIONREAD_IN_SYS_FILIO
-#include <sys/filio.h>
-#endif
+#include <sys/socket.h>
GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
#define GST_CAT_DEFAULT (udpsrc_debug)
#define UDP_DEFAULT_MULTICAST_IFACE NULL
#define UDP_DEFAULT_URI "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
#define UDP_DEFAULT_CAPS NULL
-#define UDP_DEFAULT_SOCKFD -1
+#define UDP_DEFAULT_SOCKET NULL
#define UDP_DEFAULT_BUFFER_SIZE 0
#define UDP_DEFAULT_TIMEOUT 0
#define UDP_DEFAULT_SKIP_FIRST_BYTES 0
-#define UDP_DEFAULT_CLOSEFD TRUE
-#define UDP_DEFAULT_SOCK -1
+#define UDP_DEFAULT_CLOSE_SOCKET TRUE
+#define UDP_DEFAULT_USED_SOCKET NULL
#define UDP_DEFAULT_AUTO_MULTICAST TRUE
#define UDP_DEFAULT_REUSE TRUE
PROP_MULTICAST_IFACE,
PROP_URI,
PROP_CAPS,
- PROP_SOCKFD,
+ PROP_SOCKET,
PROP_BUFFER_SIZE,
PROP_TIMEOUT,
PROP_SKIP_FIRST_BYTES,
- PROP_CLOSEFD,
- PROP_SOCK,
+ PROP_CLOSE_SOCKET,
+ PROP_USED_SOCKET,
PROP_AUTO_MULTICAST,
PROP_REUSE,
PROP_LAST
};
-#define CLOSE_IF_REQUESTED(udpctx) \
-G_STMT_START { \
- if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
- CLOSE_SOCKET(udpctx->sock.fd); \
- if (udpctx->sock.fd == udpctx->sockfd) \
- udpctx->sockfd = UDP_DEFAULT_SOCKFD; \
- } \
- udpctx->sock.fd = UDP_DEFAULT_SOCK; \
-} G_STMT_END
-
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
g_param_spec_boxed ("caps", "Caps",
"The caps of the source pad", GST_TYPE_CAPS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_SOCKFD,
- g_param_spec_int ("sockfd", "Socket Handle",
- "Socket to use for UDP reception. (-1 == allocate)",
- -1, G_MAXINT, UDP_DEFAULT_SOCKFD,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_SOCKET,
+ g_param_spec_object ("socket", "Socket",
+ "Socket to use for UDP reception. (NULL == allocate)",
+ G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
g_param_spec_int ("buffer-size", "Buffer Size",
"Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
- "Post a message after timeout microseconds (0 = disabled)", 0,
+ "Post a message after timeout nanoseconds (0 = disabled)", 0,
G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (G_OBJECT_CLASS (klass),
"Skip first bytes", "number of bytes to skip for each udp packet", 0,
G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_CLOSEFD,
- g_param_spec_boolean ("closefd", "Close sockfd",
- "Close sockfd if passed as property on state change",
- UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_SOCK,
- g_param_spec_int ("sock", "Socket Handle",
- "Socket currently in use for UDP reception. (-1 = no socket)",
- -1, G_MAXINT, UDP_DEFAULT_SOCK,
- G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
+ g_param_spec_boolean ("close-socket", "Close socket",
+ "Close socket if passed as property on state change",
+ UDP_DEFAULT_CLOSE_SOCKET,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
+ g_param_spec_object ("used-socket", "Socket Handle",
+ "Socket currently in use for UDP reception. (NULL = no socket)",
+ G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
g_param_spec_boolean ("auto-multicast", "Auto Multicast",
"Automatically join/leave multicast groups",
{
WSA_STARTUP (udpsrc);
- gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
+ udpsrc->uri =
+ g_strdup_printf ("udp://%s:%u", UDP_DEFAULT_MULTICAST_GROUP,
UDP_DEFAULT_PORT);
- udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
+ udpsrc->host = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
+ udpsrc->port = UDP_DEFAULT_PORT;
+ udpsrc->socket = UDP_DEFAULT_SOCKET;
udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
- udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
- udpsrc->externalfd = (udpsrc->sockfd != -1);
+ udpsrc->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
+ udpsrc->external_socket = (udpsrc->socket != NULL);
udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
- udpsrc->sock.fd = UDP_DEFAULT_SOCK;
+ udpsrc->used_socket = UDP_DEFAULT_USED_SOCKET;
udpsrc->reuse = UDP_DEFAULT_REUSE;
+ udpsrc->cancellable = g_cancellable_new ();
+
/* configure basesrc to be a live source */
gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
/* make basesrc output a segment in time */
if (udpsrc->caps)
gst_caps_unref (udpsrc->caps);
+ udpsrc->caps = NULL;
g_free (udpsrc->multi_iface);
+ udpsrc->multi_iface = NULL;
- gst_udp_uri_free (&udpsrc->uri);
- g_free (udpsrc->uristr);
+ g_free (udpsrc->uri);
+ udpsrc->uri = NULL;
- if (udpsrc->sockfd >= 0 && udpsrc->closefd)
- CLOSE_SOCKET (udpsrc->sockfd);
+ if (udpsrc->socket)
+ g_object_unref (udpsrc->socket);
+ udpsrc->socket = NULL;
- WSA_CLEANUP (object);
+ if (udpsrc->used_socket)
+ g_object_unref (udpsrc->used_socket);
+ udpsrc->used_socket = NULL;
+
+ if (udpsrc->cancellable)
+ g_object_unref (udpsrc->cancellable);
+ udpsrc->cancellable = NULL;
G_OBJECT_CLASS (parent_class)->finalize (object);
}
udpsrc = GST_UDPSRC (src);
- if (udpsrc->caps)
- return gst_caps_ref (udpsrc->caps);
- else
- return gst_caps_new_any ();
-}
-
-/* read a message from the error queue */
-static void
-clear_error (GstUDPSrc * udpsrc)
-{
-#if defined (MSG_ERRQUEUE)
- struct msghdr cmsg;
- char cbuf[128];
- char msgbuf[CMSG_SPACE (128)];
- struct iovec iov;
-
- /* Flush ERRORS from fd so next poll will not return at once */
- /* No need for address : We look for local error */
- cmsg.msg_name = NULL;
- cmsg.msg_namelen = 0;
-
- /* IOV */
- memset (&cbuf, 0, sizeof (cbuf));
- iov.iov_base = cbuf;
- iov.iov_len = sizeof (cbuf);
- cmsg.msg_iov = &iov;
- cmsg.msg_iovlen = 1;
-
- /* msg_control */
- memset (&msgbuf, 0, sizeof (msgbuf));
- cmsg.msg_control = &msgbuf;
- cmsg.msg_controllen = sizeof (msgbuf);
-
- recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
-#endif
+ if (udpsrc->caps) {
+ return (filter) ? gst_caps_intersect_full (filter, udpsrc->caps,
+ GST_CAPS_INTERSECT_FIRST) : gst_caps_ref (udpsrc->caps);
+ } else {
+ return (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
+ }
}
static GstFlowReturn
gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
{
GstUDPSrc *udpsrc;
- GstNetAddressMeta *meta;
GstBuffer *outbuf;
- union gst_sockaddr
- {
- struct sockaddr sa;
- struct sockaddr_in sa_in;
- struct sockaddr_in6 sa_in6;
- struct sockaddr_storage sa_stor;
- } sa;
- socklen_t slen;
+ GSocketAddress *saddr = NULL;
guint8 *pktdata;
gint pktsize;
gsize offset;
-#ifdef G_OS_UNIX
- gint readsize;
-#elif defined G_OS_WIN32
- gulong readsize;
-#endif
- GstClockTime timeout;
- gint ret;
+ gssize readsize;
+ gssize ret;
gboolean try_again;
+ GError *err = NULL;
udpsrc = GST_UDPSRC_CAST (psrc);
retry:
/* quick check, avoid going in select when we already have data */
- readsize = 0;
- if (G_UNLIKELY ((ret =
- IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
- goto ioctl_failed;
-
+ readsize = g_socket_get_available_bytes (udpsrc->used_socket);
if (readsize > 0)
goto no_select;
- if (udpsrc->timeout > 0) {
- timeout = udpsrc->timeout * GST_USECOND;
- } else {
- timeout = GST_CLOCK_TIME_NONE;
- }
-
do {
try_again = FALSE;
GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
udpsrc->timeout);
- ret = gst_poll_wait (udpsrc->fdset, timeout);
- GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
- if (G_UNLIKELY (ret < 0)) {
- if (errno == EBUSY)
+ if (!g_socket_condition_wait (udpsrc->used_socket, G_IO_IN | G_IO_PRI,
+ udpsrc->cancellable, &err)) {
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)
+ || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
goto stopped;
-#ifdef G_OS_WIN32
- if (WSAGetLastError () != WSAEINTR)
- goto select_error;
-#else
- if (errno != EAGAIN && errno != EINTR)
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+ /* timeout, post element message */
+ gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
+ gst_message_new_element (GST_OBJECT_CAST (udpsrc),
+ gst_structure_new ("GstUDPSrcTimeout",
+ "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
+ } else {
goto select_error;
-#endif
- try_again = TRUE;
- } else if (G_UNLIKELY (ret == 0)) {
- /* timeout, post element message */
- gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
- gst_message_new_element (GST_OBJECT_CAST (udpsrc),
- gst_structure_new ("GstUDPSrcTimeout",
- "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
+ }
+
try_again = TRUE;
}
} while (G_UNLIKELY (try_again));
/* ask how much is available for reading on the socket, this should be exactly
* one UDP packet. We will check the return value, though, because in some
* case it can return 0 and we don't want a 0 sized buffer. */
- readsize = 0;
- if (G_UNLIKELY ((ret =
- IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
- goto ioctl_failed;
+ readsize = g_socket_get_available_bytes (udpsrc->used_socket);
+ if (G_UNLIKELY (readsize < 0))
+ goto get_available_error;
/* If we get here and the readsize is zero, then either select was woken up
* by activity that is not a read, or a poll error occurred, or a UDP packet
if (G_UNLIKELY (!readsize)) {
/* try to read a packet (and it will be ignored),
* in case a packet with no data arrived */
- slen = sizeof (sa);
- recvfrom (udpsrc->sock.fd, (char *) &slen, 0, 0, &sa.sa, &slen);
- /* clear any error, in case a poll error occurred */
- clear_error (udpsrc);
+ pktdata = NULL;
+ pktsize = 0;
+ ret =
+ g_socket_receive_from (udpsrc->used_socket, NULL, (gchar *) pktdata,
+ pktsize, udpsrc->cancellable, &err);
+ if (G_UNLIKELY (ret < 0))
+ goto receive_error;
/* poll again */
goto retry;
pktsize = readsize;
offset = 0;
- while (TRUE) {
- slen = sizeof (sa);
-#ifdef G_OS_WIN32
- ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
- &slen);
-#else
- ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
-#endif
- if (G_UNLIKELY (ret < 0)) {
-#ifdef G_OS_WIN32
- /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
- * generated a "port unreachable" ICMP response. We ignore that and try
- * again. */
- if (WSAGetLastError () == WSAECONNRESET) {
- g_free (pktdata);
- pktdata = NULL;
- goto retry;
- }
- if (WSAGetLastError () != WSAEINTR)
- goto receive_error;
-#else
- if (errno != EAGAIN && errno != EINTR)
- goto receive_error;
-#endif
- } else
- break;
- }
+ if (saddr)
+ g_object_unref (saddr);
+ saddr = NULL;
+
+ ret =
+ g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) pktdata,
+ pktsize, udpsrc->cancellable, &err);
+
+ if (G_UNLIKELY (ret < 0))
+ goto receive_error;
/* patch pktdata and len when stripping off the headers */
if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
gst_memory_new_wrapped (0, pktdata, g_free, pktsize, offset, ret));
/* use buffer metadata so receivers can also track the address */
- meta = gst_buffer_add_net_address_meta (outbuf);
-
- switch (sa.sa.sa_family) {
- case AF_INET:
- {
- gst_net_address_set_ip4_address (&meta->naddr, sa.sa_in.sin_addr.s_addr,
- sa.sa_in.sin_port);
- }
- break;
- case AF_INET6:
- {
- guint8 ip6[16];
-
- memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
- gst_net_address_set_ip6_address (&meta->naddr, ip6, sa.sa_in6.sin6_port);
- }
- break;
- default:
-#ifdef G_OS_WIN32
- WSASetLastError (WSAEAFNOSUPPORT);
-#else
- errno = EAFNOSUPPORT;
-#endif
- goto receive_error;
+ if (saddr) {
+ gst_buffer_add_net_address_meta (outbuf, saddr);
+ g_object_unref (saddr);
}
+ saddr = NULL;
+
GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
*buf = GST_BUFFER_CAST (outbuf);
select_error:
{
GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
- ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
+ ("select error: %s", err->message));
+ g_clear_error (&err);
return GST_FLOW_ERROR;
}
stopped:
GST_DEBUG ("stop called");
return GST_FLOW_WRONG_STATE;
}
-ioctl_failed:
+get_available_error:
{
GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
- ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
+ ("get available bytes failed"));
return GST_FLOW_ERROR;
}
receive_error:
{
g_free (pktdata);
-#ifdef G_OS_WIN32
- GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
- ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
-#else
- GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
- ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
-#endif
- return GST_FLOW_ERROR;
+
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
+ g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ g_clear_error (&err);
+ return GST_FLOW_WRONG_STATE;
+ } else {
+ GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+ ("receive error %d: %s", ret, err->message));
+ g_clear_error (&err);
+ return GST_FLOW_ERROR;
+ }
}
skip_error:
{
static gboolean
gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
{
- if (gst_udp_parse_uri (uri, &src->uri) < 0)
+ gchar *host;
+ guint16 port;
+
+ if (!gst_udp_parse_uri (uri, &host, &port))
goto wrong_uri;
- if (src->uri.port == -1)
- src->uri.port = UDP_DEFAULT_PORT;
+ if (port == -1)
+ port = UDP_DEFAULT_PORT;
+
+ g_free (src->host);
+ src->host = host;
+ src->port = port;
+
+ g_free (src->uri);
+ src->uri = g_strdup (uri);
return TRUE;
udpsrc->buffer_size = g_value_get_int (value);
break;
case PROP_PORT:
- gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
+ udpsrc->port = g_value_get_int (value);
+ g_free (udpsrc->uri);
+ udpsrc->uri = g_strdup_printf ("udp://%s:%u", udpsrc->host, udpsrc->port);
break;
case PROP_MULTICAST_GROUP:
{
const gchar *group;
+ g_free (udpsrc->host);
if ((group = g_value_get_string (value)))
- gst_udp_uri_update (&udpsrc->uri, group, -1);
+ udpsrc->host = g_strdup (group);
else
- gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
+ udpsrc->host = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
+
+ g_free (udpsrc->uri);
+ udpsrc->uri = g_strdup_printf ("udp://%s:%u", udpsrc->host, udpsrc->port);
break;
}
case PROP_MULTICAST_IFACE:
gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
break;
}
- case PROP_SOCKFD:
- if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
- udpsrc->closefd)
- CLOSE_SOCKET (udpsrc->sockfd);
- udpsrc->sockfd = g_value_get_int (value);
- GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
+ case PROP_SOCKET:
+ if (udpsrc->socket != NULL && udpsrc->socket != udpsrc->used_socket &&
+ udpsrc->close_socket) {
+ GError *err = NULL;
+
+ if (!g_socket_close (udpsrc->socket, &err)) {
+ GST_ERROR ("failed to close socket %p: %s", udpsrc->socket,
+ err->message);
+ g_clear_error (&err);
+ }
+ }
+ if (udpsrc->socket)
+ g_object_unref (udpsrc->socket);
+ udpsrc->socket = g_value_dup_object (value);
+ GST_DEBUG ("setting socket to %p", udpsrc->socket);
break;
case PROP_TIMEOUT:
udpsrc->timeout = g_value_get_uint64 (value);
case PROP_SKIP_FIRST_BYTES:
udpsrc->skip_first_bytes = g_value_get_int (value);
break;
- case PROP_CLOSEFD:
- udpsrc->closefd = g_value_get_boolean (value);
+ case PROP_CLOSE_SOCKET:
+ udpsrc->close_socket = g_value_get_boolean (value);
break;
case PROP_AUTO_MULTICAST:
udpsrc->auto_multicast = g_value_get_boolean (value);
g_value_set_int (value, udpsrc->buffer_size);
break;
case PROP_PORT:
- g_value_set_int (value, udpsrc->uri.port);
+ g_value_set_int (value, udpsrc->port);
break;
case PROP_MULTICAST_GROUP:
- g_value_set_string (value, udpsrc->uri.host);
+ g_value_set_string (value, udpsrc->host);
break;
case PROP_MULTICAST_IFACE:
g_value_set_string (value, udpsrc->multi_iface);
break;
case PROP_URI:
- g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
+ g_value_take_string (value, udpsrc->uri);
break;
case PROP_CAPS:
gst_value_set_caps (value, udpsrc->caps);
break;
- case PROP_SOCKFD:
- g_value_set_int (value, udpsrc->sockfd);
+ case PROP_SOCKET:
+ g_value_set_object (value, udpsrc->socket);
break;
case PROP_TIMEOUT:
g_value_set_uint64 (value, udpsrc->timeout);
case PROP_SKIP_FIRST_BYTES:
g_value_set_int (value, udpsrc->skip_first_bytes);
break;
- case PROP_CLOSEFD:
- g_value_set_boolean (value, udpsrc->closefd);
+ case PROP_CLOSE_SOCKET:
+ g_value_set_boolean (value, udpsrc->close_socket);
break;
- case PROP_SOCK:
- g_value_set_int (value, udpsrc->sock.fd);
+ case PROP_USED_SOCKET:
+ g_value_set_object (value, udpsrc->used_socket);
break;
case PROP_AUTO_MULTICAST:
g_value_set_boolean (value, udpsrc->auto_multicast);
static gboolean
gst_udpsrc_start (GstBaseSrc * bsrc)
{
- guint bc_val;
- guint err_val;
- gint reuse;
- int port;
GstUDPSrc *src;
- gint ret;
- int rcvsize;
- struct sockaddr_storage bind_address;
- socklen_t len;
+ GInetAddress *addr, *bind_addr;
+ GSocketAddress *bind_saddr;
+ GResolver *resolver;
+ GError *err = NULL;
+
src = GST_UDPSRC (bsrc);
- if (src->sockfd == -1) {
+ if (src->socket == NULL) {
/* need to allocate a socket */
- GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
- src->uri.port);
- if ((ret =
- gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
- goto getaddrinfo_error;
-
- if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
- goto no_socket;
+ GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->host, src->port);
+
+ addr = g_inet_address_new_from_string (src->host);
+ if (!addr) {
+ GList *results;
+
+ resolver = g_resolver_get_default ();
+ results =
+ g_resolver_lookup_by_name (resolver, src->host, src->cancellable,
+ &err);
+ if (!results)
+ goto name_resolve;
+ addr = G_INET_ADDRESS (g_object_ref (results->data));
+
+ g_resolver_free_addresses (results);
+ g_object_unref (resolver);
+ }
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ gchar *ip = g_inet_address_to_string (addr);
- src->sock.fd = ret;
- src->externalfd = FALSE;
-
- GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);
-
- GST_DEBUG_OBJECT (src, "setting reuse %d", src->reuse);
- reuse = src->reuse ? 1 : 0;
- if ((ret =
- setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
- sizeof (reuse))) < 0)
- goto setsockopt_error;
-
- GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
-
- /* Take a temporary copy of the address in case we need to fix it for bind */
- memcpy (&bind_address, &src->myaddr, sizeof (struct sockaddr_storage));
-
-#ifdef G_OS_WIN32
- /* Windows does not allow binding to a multicast group so fix source address */
- if (gst_udp_is_multicast (&src->myaddr)) {
- switch (((struct sockaddr *) &bind_address)->sa_family) {
- case AF_INET:
- ((struct sockaddr_in *) &bind_address)->sin_addr.s_addr =
- htonl (INADDR_ANY);
- break;
- case AF_INET6:
- ((struct sockaddr_in6 *) &bind_address)->sin6_addr = in6addr_any;
- break;
- default:
- break;
- }
+ GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
+ g_free (ip);
}
#endif
- len = gst_udp_get_sockaddr_length (&bind_address);
- if ((ret = bind (src->sock.fd, (struct sockaddr *) &bind_address, len)) < 0)
+ if ((src->used_socket =
+ g_socket_new (g_inet_address_get_family (addr),
+ G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
+ goto no_socket;
+
+ src->external_socket = FALSE;
+
+ GST_DEBUG_OBJECT (src, "got socket %p", src->used_socket);
+
+ if (src->addr)
+ g_object_unref (src->addr);
+ src->addr =
+ G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
+
+ GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
+
+ if (g_inet_address_get_is_multicast (addr))
+ bind_addr = g_inet_address_new_any (g_inet_address_get_family (addr));
+ else
+ bind_addr = G_INET_ADDRESS (g_object_ref (addr));
+
+ g_object_unref (addr);
+
+ bind_saddr = g_inet_socket_address_new (bind_addr, src->port);
+ g_object_unref (bind_addr);
+ if (!g_socket_bind (src->used_socket, bind_saddr, src->reuse, &err))
goto bind_error;
- if (!gst_udp_is_multicast (&src->myaddr)) {
- len = sizeof (src->myaddr);
- if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
- &len)) < 0)
- goto getsockname_error;
- }
+ g_object_unref (bind_saddr);
} else {
- GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
+ GST_DEBUG_OBJECT (src, "using provided socket %d", src->socket);
/* we use the configured socket, try to get some info about it */
- len = sizeof (src->myaddr);
- if ((ret =
- getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
- &len)) < 0)
+ src->used_socket = G_SOCKET (g_object_ref (src->socket));
+ src->external_socket = TRUE;
+
+ if (src->addr)
+ g_object_unref (src->addr);
+ src->addr =
+ G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
+ &err));
+ if (!src->addr)
goto getsockname_error;
-
- src->sock.fd = src->sockfd;
- src->externalfd = TRUE;
}
- len = sizeof (rcvsize);
- if (src->buffer_size != 0) {
- rcvsize = src->buffer_size;
+ if (src->timeout)
+ g_socket_set_timeout (src->used_socket, src->timeout / GST_SECOND);
- GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
- /* set buffer size, Note that on Linux this is typically limited to a
- * maximum of around 100K. Also a minimum of 128 bytes is required on
- * Linux. */
- ret =
- setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
- len);
- if (ret != 0) {
- GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
- ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
- rcvsize, ret, g_strerror (errno), errno));
+#ifdef SO_RECVBUF
+ {
+ gint rcvsize, ret;
+
+ len = sizeof (rcvsize);
+ if (src->buffer_size != 0) {
+ rcvsize = src->buffer_size;
+
+ GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
+ /* set buffer size, Note that on Linux this is typically limited to a
+ * maximum of around 100K. Also a minimum of 128 bytes is required on
+ * Linux. */
+ ret =
+ setsockopt (g_socket_get_fd (src->used_socket), SOL_SOCKET, SO_RCVBUF,
+ (void *) &rcvsize, len);
+ if (ret != 0) {
+ GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+ ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
+ rcvsize, ret, g_strerror (errno), errno));
+ }
}
- }
- /* read the value of the receive buffer. Note that on linux this returns 2x the
- * value we set because the kernel allocates extra memory for metadata.
- * The default on Linux is about 100K (which is about 50K without metadata) */
- ret =
- getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
- if (ret == 0)
- GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
- else
- GST_DEBUG_OBJECT (src, "could not get udp buffer size");
-
- bc_val = 1;
- if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
- sizeof (bc_val))) < 0) {
- GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
- ("could not configure socket for broadcast %d: %s (%d)", ret,
- g_strerror (errno), errno));
- }
-
- /* Accept ERRQUEUE to get and flush icmp errors */
- err_val = 1;
-#if defined (IP_RECVERR)
- if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
- sizeof (err_val))) < 0) {
- GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
- ("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
- g_strerror (errno), errno));
+ /* read the value of the receive buffer. Note that on linux this returns 2x the
+ * value we set because the kernel allocates extra memory for metadata.
+ * The default on Linux is about 100K (which is about 50K without metadata) */
+ ret =
+ getsockopt (g_socket_get_fd (src->used_socket), SOL_SOCKET, SO_RCVBUF,
+ (void *) &rcvsize, &len);
+ if (ret == 0)
+ GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
+ else
+ GST_DEBUG_OBJECT (src, "could not get udp buffer size");
}
#endif
- if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
- GST_DEBUG_OBJECT (src, "joining multicast group %s", src->uri.host);
- ret = gst_udp_join_group (src->sock.fd, &src->myaddr, src->multi_iface);
- if (ret < 0)
+ g_socket_set_broadcast (src->used_socket, TRUE);
+
+ if (src->auto_multicast
+ &&
+ g_inet_address_get_is_multicast (g_inet_socket_address_get_address
+ (src->addr))) {
+ GST_DEBUG_OBJECT (src, "joining multicast group %s", src->host);
+ if (!g_socket_join_multicast_group (src->used_socket,
+ g_inet_socket_address_get_address (src->addr),
+ FALSE, src->multi_iface, &err))
goto membership;
}
/* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
* follows ss_family on both */
- port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
- GST_DEBUG_OBJECT (src, "bound, on port %d", port);
- if (port != src->uri.port) {
- src->uri.port = port;
- GST_DEBUG_OBJECT (src, "notifying port %d", port);
- g_object_notify (G_OBJECT (src), "port");
- }
+ {
+ GInetSocketAddress *addr;
+ guint16 port;
- if ((src->fdset = gst_poll_new (TRUE)) == NULL)
- goto no_fdset;
+ addr =
+ G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
+ &err));
+ if (!addr)
+ goto getsockname_error;
- gst_poll_add_fd (src->fdset, &src->sock);
- gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
+ port = g_inet_socket_address_get_port (addr);
+ GST_DEBUG_OBJECT (src, "bound, on port %d", port);
+ if (port != src->port) {
+ src->port = port;
+ GST_DEBUG_OBJECT (src, "notifying port %d", port);
+ g_object_notify (G_OBJECT (src), "port");
+ }
+ g_object_unref (addr);
+ }
return TRUE;
/* ERRORS */
-getaddrinfo_error:
+name_resolve:
{
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("getaddrinfo failed: %s (%d)", gai_strerror (ret), ret));
+ ("Name resolval failed: %s", err->message));
+ g_clear_error (&err);
+ g_object_unref (resolver);
return FALSE;
}
no_socket:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
- ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
- return FALSE;
- }
-setsockopt_error:
- {
- CLOSE_IF_REQUESTED (src);
- GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
+ ("no socket error: %s", err->message));
+ g_clear_error (&err);
+ g_object_unref (addr);
return FALSE;
}
bind_error:
{
- CLOSE_IF_REQUESTED (src);
+ gst_udpsrc_stop (GST_BASE_SRC (src));
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
+ ("bind failed: %s", err->message));
+ g_clear_error (&err);
+ g_object_unref (bind_saddr);
return FALSE;
}
membership:
{
- CLOSE_IF_REQUESTED (src);
+ gst_udpsrc_stop (GST_BASE_SRC (src));
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
+ ("could add membership: %s", err->message));
+ g_clear_error (&err);
return FALSE;
}
getsockname_error:
{
- CLOSE_IF_REQUESTED (src);
+ gst_udpsrc_stop (GST_BASE_SRC (src));
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
- return FALSE;
- }
-no_fdset:
- {
- CLOSE_IF_REQUESTED (src);
- GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
- ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
- errno));
+ ("getsockname failed: %s", err->message));
+ g_clear_error (&err);
return FALSE;
}
}
src = GST_UDPSRC (bsrc);
GST_LOG_OBJECT (src, "Flushing");
- gst_poll_set_flushing (src->fdset, TRUE);
+ g_cancellable_cancel (src->cancellable);
return TRUE;
}
src = GST_UDPSRC (bsrc);
GST_LOG_OBJECT (src, "No longer flushing");
- gst_poll_set_flushing (src->fdset, FALSE);
+ g_cancellable_reset (src->cancellable);
return TRUE;
}
GST_DEBUG ("stopping, closing sockets");
- if (src->sock.fd >= 0) {
- if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
- GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->uri.host);
- gst_udp_leave_group (src->sock.fd, &src->myaddr);
+ if (src->used_socket) {
+ if (src->auto_multicast
+ &&
+ g_inet_address_get_is_multicast (g_inet_socket_address_get_address
+ (src->addr))) {
+ GError *err = NULL;
+
+ GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->host);
+
+ if (!g_socket_leave_multicast_group (src->used_socket,
+ g_inet_socket_address_get_address (src->addr), FALSE,
+ src->multi_iface, NULL)) {
+ GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
+ err->message);
+ g_clear_error (&err);
+ }
+ }
+
+ if (src->close_socket || !src->external_socket) {
+ GError *err = NULL;
+ if (!g_socket_close (src->used_socket, &err)) {
+ GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
+ g_clear_error (&err);
+ }
}
- CLOSE_IF_REQUESTED (src);
- }
- if (src->fdset) {
- gst_poll_free (src->fdset);
- src->fdset = NULL;
+ g_object_unref (src->used_socket);
+ src->used_socket = NULL;
+ g_object_unref (src->addr);
+ src->addr = NULL;
}
return TRUE;
{
GstUDPSrc *src = GST_UDPSRC (handler);
- /* FIXME: make thread-safe; maybe we can get rid of this assignment here? */
- g_free (src->uristr);
- src->uristr = gst_udp_uri_string (&src->uri);
-
- return g_strdup (src->uristr);
+ return g_strdup (src->uri);
}
static gboolean