* Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
* Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
* Copyright (C) <2006> Joni Valtanen <joni.valtanen@movial.fi>
+ * Copyright (C) <2012> Collabora Ltd.
+ * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
#include "gstudp-marshal.h"
#include "gstdynudpsink.h"
-#include <stdio.h>
-#include <stdlib.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#include <errno.h>
-#include <string.h>
-#ifdef HAVE_SYS_TIME_H
-#include <sys/time.h>
-#endif
-#include <sys/types.h>
#include <gst/net/gstnetaddressmeta.h>
GST_DEBUG_CATEGORY_STATIC (dynudpsink_debug);
LAST_SIGNAL
};
-#define UDP_DEFAULT_SOCKFD -1
-#define UDP_DEFAULT_CLOSEFD TRUE
+#define UDP_DEFAULT_SOCKET NULL
+#define UDP_DEFAULT_CLOSE_SOCKET TRUE
+#define UDP_DEFAULT_FAMILY G_SOCKET_FAMILY_IPV4
enum
{
PROP_0,
- PROP_SOCKFD,
- PROP_CLOSEFD
+ PROP_SOCKET,
+ PROP_CLOSE_SOCKET,
+ PROP_FAMILY
};
-#define CLOSE_IF_REQUESTED(udpctx) \
-G_STMT_START { \
- if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
- CLOSE_SOCKET(udpctx->sock); \
- if (udpctx->sock == udpctx->sockfd) \
- udpctx->sockfd = UDP_DEFAULT_SOCKFD; \
- } \
- udpctx->sock = -1; \
-} G_STMT_END
-
static void gst_dynudpsink_finalize (GObject * object);
static GstFlowReturn gst_dynudpsink_render (GstBaseSink * sink,
GstBuffer * buffer);
-static void gst_dynudpsink_close (GstDynUDPSink * sink);
-static GstStateChangeReturn gst_dynudpsink_change_state (GstElement * element,
- GstStateChange transition);
+static gboolean gst_dynudpsink_stop (GstBaseSink * bsink);
+static gboolean gst_dynudpsink_start (GstBaseSink * bsink);
+static gboolean gst_dynudpsink_unlock (GstBaseSink * bsink);
+static gboolean gst_dynudpsink_unlock_stop (GstBaseSink * bsink);
static void gst_dynudpsink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
G_TYPE_STRING, G_TYPE_INT);
- g_object_class_install_property (gobject_class, PROP_SOCKFD,
- g_param_spec_int ("sockfd", "socket handle",
- "Socket to use for UDP sending. (-1 == allocate)",
- -1, G_MAXINT16, UDP_DEFAULT_SOCKFD,
+ g_object_class_install_property (gobject_class, PROP_SOCKET,
+ g_param_spec_object ("socket", "Socket",
+ "Socket to use for UDP sending. (NULL == allocate)",
+ G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
+ g_param_spec_boolean ("close-socket", "Close socket",
+ "Close socket if passed as property on state change",
+ UDP_DEFAULT_CLOSE_SOCKET,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_CLOSEFD,
- g_param_spec_boolean ("closefd", "Close sockfd",
- "Close sockfd if passed as property on state change",
- UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- gstelement_class->change_state = gst_dynudpsink_change_state;
+ g_object_class_install_property (gobject_class, PROP_FAMILY,
+ g_param_spec_enum ("family", "Socket family",
+ "Use IPv4 or IPv6",
+ G_TYPE_SOCKET_FAMILY, UDP_DEFAULT_FAMILY,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sink_template));
"Philippe Khalaf <burger@speedy.org>");
gstbasesink_class->render = gst_dynudpsink_render;
+ gstbasesink_class->start = gst_dynudpsink_start;
+ gstbasesink_class->stop = gst_dynudpsink_stop;
+ gstbasesink_class->unlock = gst_dynudpsink_unlock;
+ gstbasesink_class->unlock_stop = gst_dynudpsink_unlock_stop;
GST_DEBUG_CATEGORY_INIT (dynudpsink_debug, "dynudpsink", 0, "UDP sink");
}
static void
gst_dynudpsink_init (GstDynUDPSink * sink)
{
- WSA_STARTUP (sink);
-
- sink->sockfd = UDP_DEFAULT_SOCKFD;
- sink->closefd = UDP_DEFAULT_CLOSEFD;
- sink->externalfd = FALSE;
+ sink->socket = UDP_DEFAULT_SOCKET;
+ sink->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
+ sink->external_socket = FALSE;
+ sink->family = G_SOCKET_FAMILY_IPV4;
- sink->sock = -1;
+ sink->used_socket = NULL;
+ sink->cancellable = g_cancellable_new ();
}
static void
gst_dynudpsink_finalize (GObject * object)
{
- GstDynUDPSink *udpsink;
+ GstDynUDPSink *sink;
- udpsink = GST_DYNUDPSINK (object);
+ sink = GST_DYNUDPSINK (object);
- if (udpsink->sockfd >= 0 && udpsink->closefd)
- CLOSE_SOCKET (udpsink->sockfd);
+ if (sink->cancellable)
+ g_object_unref (sink->cancellable);
+ sink->cancellable = NULL;
- G_OBJECT_CLASS (parent_class)->finalize (object);
+ if (sink->socket)
+ g_object_unref (sink->socket);
+ sink->socket = NULL;
- WSA_CLEANUP (object);
+ if (sink->used_socket)
+ g_object_unref (sink->used_socket);
+ sink->used_socket = NULL;
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
}
static GstFlowReturn
gst_dynudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
GstDynUDPSink *sink;
- gint ret;
+ gssize ret;
gsize size;
guint8 *data;
GstNetAddressMeta *meta;
- struct sockaddr_in theiraddr;
- guint16 destport;
- guint32 destaddr;
- const guint8 *destaddr_bytes;
-
- memset (&theiraddr, 0, sizeof (theiraddr));
+ GSocketAddress *addr;
+ GError *err = NULL;
meta = gst_buffer_get_net_address_meta (buffer);
sink = GST_DYNUDPSINK (bsink);
+ /* let's get the address from the metadata */
+ addr = meta->addr;
+
+ if (g_socket_address_get_family (addr) != sink->family)
+ goto invalid_family;
+
data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
GST_DEBUG ("about to send %" G_GSIZE_FORMAT " bytes", size);
- /* let's get the address from the metaata */
- destaddr_bytes =
- g_inet_address_to_bytes (g_inet_socket_address_get_address
- (G_INET_SOCKET_ADDRESS (meta->addr)));
- destaddr = GST_READ_UINT32_BE (destaddr_bytes);
- destport =
- g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (meta->addr));
-
- GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %d port %d", size,
- destaddr, destport);
-
- theiraddr.sin_family = AF_INET;
- theiraddr.sin_addr.s_addr = destaddr;
- theiraddr.sin_port = destport;
-#ifdef G_OS_WIN32
- ret = sendto (sink->sock, (char *) data, size, 0,
-#else
- ret = sendto (sink->sock, data, size, 0,
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ gchar *host;
+
+ host =
+ g_inet_address_to_string (g_inet_socket_address_get_address
+ (G_INET_SOCKET_ADDRESS (addr)));
+ GST_DEBUG ("sending %" G_GSIZE_FORMAT " bytes to client %s port %d", size,
+ host, g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)));
+ g_free (host);
+ }
#endif
- (struct sockaddr *) &theiraddr, sizeof (theiraddr));
+ ret =
+ g_socket_send_to (sink->used_socket, addr, (gchar *) data, size,
+ sink->cancellable, &err);
gst_buffer_unmap (buffer, data, size);
- if (ret < 0) {
- if (errno != EINTR && errno != EAGAIN) {
- goto send_error;
- }
- }
+ if (ret < 0)
+ goto send_error;
- GST_DEBUG ("sent %" G_GSIZE_FORMAT " bytes", size);
+ GST_DEBUG ("sent %" G_GSSIZE_FORMAT " bytes", ret);
return GST_FLOW_OK;
send_error:
{
- GST_DEBUG ("got send error %s (%d)", g_strerror (errno), errno);
+ GST_DEBUG ("got send error %s", err->message);
+ g_clear_error (&err);
+ return GST_FLOW_ERROR;
+ }
+invalid_family:
+ {
+ GST_DEBUG ("invalid family (got %d, expected %d)",
+ g_socket_address_get_family (addr), sink->family);
return GST_FLOW_ERROR;
}
}
udpsink = GST_DYNUDPSINK (object);
switch (prop_id) {
- case PROP_SOCKFD:
- if (udpsink->sockfd >= 0 && udpsink->sockfd != udpsink->sock &&
- udpsink->closefd)
- CLOSE_SOCKET (udpsink->sockfd);
- udpsink->sockfd = g_value_get_int (value);
- GST_DEBUG ("setting SOCKFD to %d", udpsink->sockfd);
+ case PROP_SOCKET:
+ if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
+ udpsink->close_socket) {
+ GError *err = NULL;
+
+ if (!g_socket_close (udpsink->socket, &err)) {
+ GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
+ err->message);
+ g_clear_error (&err);
+ }
+ }
+ if (udpsink->socket)
+ g_object_unref (udpsink->socket);
+ udpsink->socket = g_value_dup_object (value);
+ GST_DEBUG ("setting socket to %p", udpsink->socket);
break;
- case PROP_CLOSEFD:
- udpsink->closefd = g_value_get_boolean (value);
+ case PROP_CLOSE_SOCKET:
+ udpsink->close_socket = g_value_get_boolean (value);
+ break;
+ case PROP_FAMILY:
+ udpsink->family = g_value_get_enum (value);
break;
-
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
udpsink = GST_DYNUDPSINK (object);
switch (prop_id) {
- case PROP_SOCKFD:
- g_value_set_int (value, udpsink->sockfd);
+ case PROP_SOCKET:
+ g_value_set_object (value, udpsink->socket);
break;
- case PROP_CLOSEFD:
- g_value_set_boolean (value, udpsink->closefd);
+ case PROP_CLOSE_SOCKET:
+ g_value_set_boolean (value, udpsink->close_socket);
+ break;
+ case PROP_FAMILY:
+ g_value_set_enum (value, udpsink->family);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
}
}
-
/* create a socket for sending to remote machine */
static gboolean
-gst_dynudpsink_init_send (GstDynUDPSink * sink)
+gst_dynudpsink_start (GstBaseSink * bsink)
{
- guint bc_val;
+ GstDynUDPSink *udpsink;
+ GError *err = NULL;
+
+ udpsink = GST_DYNUDPSINK (bsink);
- if (sink->sockfd == -1) {
+ if (udpsink->socket == NULL) {
/* create sender socket if none available */
- if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
+ if ((udpsink->used_socket =
+ g_socket_new (udpsink->family,
+ G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
goto no_socket;
- bc_val = 1;
- if (setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
- sizeof (bc_val)) < 0)
- goto no_broadcast;
-
- sink->externalfd = TRUE;
+ g_socket_set_broadcast (udpsink->used_socket, TRUE);
+ udpsink->external_socket = FALSE;
} else {
- sink->sock = sink->sockfd;
- sink->externalfd = TRUE;
+ udpsink->used_socket = G_SOCKET (g_object_ref (udpsink->socket));
+ udpsink->external_socket = TRUE;
}
+
return TRUE;
/* ERRORS */
no_socket:
{
- perror ("socket");
- return FALSE;
- }
-no_broadcast:
- {
- perror ("setsockopt");
- CLOSE_IF_REQUESTED (sink);
+ GST_ERROR_OBJECT (udpsink, "Failed to create socket: %s", err->message);
+ g_clear_error (&err);
return FALSE;
}
}
-GValueArray *
+GstStructure *
gst_dynudpsink_get_stats (GstDynUDPSink * sink, const gchar * host, gint port)
{
return NULL;
}
-static void
-gst_dynudpsink_close (GstDynUDPSink * sink)
+static gboolean
+gst_dynudpsink_stop (GstBaseSink * bsink)
{
- CLOSE_IF_REQUESTED (sink);
+ GstDynUDPSink *udpsink;
+
+ udpsink = GST_DYNUDPSINK (bsink);
+
+ if (udpsink->used_socket) {
+ if (udpsink->close_socket || !udpsink->external_socket) {
+ GError *err = NULL;
+
+ if (!g_socket_close (udpsink->used_socket, &err)) {
+ GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
+ g_clear_error (&err);
+ }
+ }
+
+ g_object_unref (udpsink->used_socket);
+ udpsink->used_socket = NULL;
+ }
+
+ return TRUE;
}
-static GstStateChangeReturn
-gst_dynudpsink_change_state (GstElement * element, GstStateChange transition)
+static gboolean
+gst_dynudpsink_unlock (GstBaseSink * bsink)
{
- GstStateChangeReturn ret;
- GstDynUDPSink *sink;
+ GstDynUDPSink *udpsink;
- sink = GST_DYNUDPSINK (element);
+ udpsink = GST_DYNUDPSINK (bsink);
- switch (transition) {
- case GST_STATE_CHANGE_READY_TO_PAUSED:
- if (!gst_dynudpsink_init_send (sink))
- goto no_init;
- break;
- default:
- break;
- }
+ g_cancellable_cancel (udpsink->cancellable);
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ return TRUE;
+}
- switch (transition) {
- case GST_STATE_CHANGE_PAUSED_TO_READY:
- gst_dynudpsink_close (sink);
- break;
- default:
- break;
- }
- return ret;
+static gboolean
+gst_dynudpsink_unlock_stop (GstBaseSink * bsink)
+{
+ GstDynUDPSink *udpsink;
- /* ERRORS */
-no_init:
- {
- return GST_STATE_CHANGE_FAILURE;
- }
+ udpsink = GST_DYNUDPSINK (bsink);
+
+ g_cancellable_reset (udpsink->cancellable);
+
+ return TRUE;
}