gst/udp/: Added multifdsink to send UDP to multiple addresses.
authorWim Taymans <wim.taymans@gmail.com>
Thu, 12 May 2005 15:32:51 +0000 (15:32 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 12 May 2005 15:32:51 +0000 (15:32 +0000)
Original commit message from CVS:
* gst/udp/.cvsignore:
* gst/udp/Makefile.am:
* gst/udp/gstmultiudpsink.c: (gst_multiudpsink_get_type),
(gst_multiudpsink_base_init), (gst_multiudpsink_class_init),
(gst_multiudpsink_init), (gst_multiudpsink_finalize),
(gst_multiudpsink_get_times), (gst_multiudpsink_render),
(gst_multiudpsink_set_property), (gst_multiudpsink_get_property),
(gst_multiudpsink_init_send), (gst_multiudpsink_close),
(gst_multiudpsink_add), (gst_multiudpsink_remove),
(gst_multiudpsink_clear), (gst_multiudpsink_get_stats),
(gst_multiudpsink_change_state):
* gst/udp/gstmultiudpsink.h:
* gst/udp/gstudp-marshal.list:
* gst/udp/gstudp.c: (plugin_init):
* gst/udp/gstudp.h:
* gst/udp/gstudpsink.c: (gst_udpsink_get_type),
(gst_udpsink_base_init), (gst_udpsink_class_init),
(gst_udpsink_init), (gst_udpsink_set_uri),
(gst_udpsink_set_property), (gst_udpsink_get_property),
(gst_udpsink_uri_get_type), (gst_udpsink_uri_get_protocols),
(gst_udpsink_uri_get_uri), (gst_udpsink_uri_set_uri),
(gst_udpsink_uri_handler_init):
* gst/udp/gstudpsink.h:
* gst/udp/gstudpsrc.c: (gst_udpsrc_get_type),
(gst_udpsrc_base_init), (gst_udpsrc_class_init),
(gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start),
(gst_udpsrc_unlock), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Added multifdsink to send UDP to multiple addresses.
Cleaned up UDP source/sink elements some more.
Make UDP sink extends from multiudpsink.

12 files changed:
ChangeLog
gst/udp/.gitignore
gst/udp/Makefile.am
gst/udp/gstmultiudpsink.c [new file with mode: 0644]
gst/udp/gstmultiudpsink.h [new file with mode: 0644]
gst/udp/gstudp-marshal.list [new file with mode: 0644]
gst/udp/gstudp.c
gst/udp/gstudp.h
gst/udp/gstudpsink.c
gst/udp/gstudpsink.h
gst/udp/gstudpsrc.c
gst/udp/gstudpsrc.h

index f081936..b9d2b82 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,37 @@
+2005-05-12  Wim Taymans  <wim@fluendo.com>
+
+       * gst/udp/.cvsignore:
+       * gst/udp/Makefile.am:
+       * gst/udp/gstmultiudpsink.c: (gst_multiudpsink_get_type),
+       (gst_multiudpsink_base_init), (gst_multiudpsink_class_init),
+       (gst_multiudpsink_init), (gst_multiudpsink_finalize),
+       (gst_multiudpsink_get_times), (gst_multiudpsink_render),
+       (gst_multiudpsink_set_property), (gst_multiudpsink_get_property),
+       (gst_multiudpsink_init_send), (gst_multiudpsink_close),
+       (gst_multiudpsink_add), (gst_multiudpsink_remove),
+       (gst_multiudpsink_clear), (gst_multiudpsink_get_stats),
+       (gst_multiudpsink_change_state):
+       * gst/udp/gstmultiudpsink.h:
+       * gst/udp/gstudp-marshal.list:
+       * gst/udp/gstudp.c: (plugin_init):
+       * gst/udp/gstudp.h:
+       * gst/udp/gstudpsink.c: (gst_udpsink_get_type),
+       (gst_udpsink_base_init), (gst_udpsink_class_init),
+       (gst_udpsink_init), (gst_udpsink_set_uri),
+       (gst_udpsink_set_property), (gst_udpsink_get_property),
+       (gst_udpsink_uri_get_type), (gst_udpsink_uri_get_protocols),
+       (gst_udpsink_uri_get_uri), (gst_udpsink_uri_set_uri),
+       (gst_udpsink_uri_handler_init):
+       * gst/udp/gstudpsink.h:
+       * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type),
+       (gst_udpsrc_base_init), (gst_udpsrc_class_init),
+       (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start),
+       (gst_udpsrc_unlock), (gst_udpsrc_stop):
+       * gst/udp/gstudpsrc.h:
+       Added multifdsink to send UDP to multiple addresses.
+       Cleaned up UDP source/sink elements some more.
+       Make UDP sink extends from multiudpsink.
+
 2005-05-12  Tim-Philipp Müller  <tim at centricular dot net>
 
        * ext/mad/gstmad.c: (gst_mad_src_query), (gst_mad_sink_event):
index 08f5ed3..6290889 100644 (file)
@@ -5,3 +5,8 @@ Makefile.in
 *.la
 .deps
 .libs
+gstudp-enumtypes.c
+gstudp-enumtypes.h
+gstudp-marshal.c
+gstudp-marshal.h
+
index b888c06..d88d113 100644 (file)
@@ -1,11 +1,28 @@
-
 plugin_LTLIBRARIES = libgstudp.la
 
-libgstudp_la_SOURCES = gstudp.c gstudpsrc.c gstudpsink.c
+# variables used for enum/marshal generation
+glib_enum_headers = gstudp.h
+glib_enum_define = GST_UDP
+glib_enum_prefix = gst_udp
+
+include $(top_srcdir)/common/glib-gen.mak
+
+built_sources = gstudp-enumtypes.c gstudp-marshal.c
+built_headers = gstudp-enumtypes.h gstudp-marshal.h
+
+BUILT_SOURCES = $(built_sources) $(built_headers)
+
+libgstudp_la_SOURCES = gstudp.c gstudpsrc.c gstudpsink.c gstmultiudpsink.c
 libgstudp_la_CFLAGS = $(GST_CFLAGS)
 libgstudp_la_LIBADD =
 libgstudp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS)
 
-noinst_HEADERS = gstudpsink.h gstudpsrc.h gstudp.h
+nodist_libgstudp_la_SOURCES = \
+        $(built_sources)
+
+noinst_HEADERS = gstudpsink.h gstudpsrc.h gstudp.h gstmultiudpsink.h
+
+EXTRA_DIST = README gstudp-marshal.list
+
+CLEANFILES = $(BUILT_SOURCES)
 
-EXTRA_DIST = README
diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c
new file mode 100644 (file)
index 0000000..c7ea9e2
--- /dev/null
@@ -0,0 +1,437 @@
+/* GStreamer
+ * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "gstudp-marshal.h"
+#include "gstmultiudpsink.h"
+
+GST_DEBUG_CATEGORY (multiudpsink_debug);
+#define GST_CAT_DEFAULT (multiudpsink_debug)
+
+static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+/* elementfactory information */
+static GstElementDetails gst_multiudpsink_details =
+GST_ELEMENT_DETAILS ("UDP packet sender",
+    "Sink/Network",
+    "Send data over the network via UDP",
+    "Wim Taymans <wim@fluendo.com>");
+
+/* MultiUDPSink signals and args */
+enum
+{
+  /* methods */
+  SIGNAL_ADD,
+  SIGNAL_REMOVE,
+  SIGNAL_CLEAR,
+  SIGNAL_GET_STATS,
+
+  /* signals */
+  SIGNAL_CLIENT_ADDED,
+  SIGNAL_CLIENT_REMOVED,
+
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+enum
+{
+  PROP_0,
+  /* FILL ME */
+};
+
+static void gst_multiudpsink_base_init (gpointer g_class);
+static void gst_multiudpsink_class_init (GstMultiUDPSink * klass);
+static void gst_multiudpsink_init (GstMultiUDPSink * udpsink);
+static void gst_multiudpsink_finalize (GObject * object);
+
+static void gst_multiudpsink_get_times (GstBaseSink * sink, GstBuffer * buffer,
+    GstClockTime * start, GstClockTime * end);
+static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
+    GstBuffer * buffer);
+static GstElementStateReturn gst_multiudpsink_change_state (GstElement *
+    element);
+
+static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+static GstElementClass *parent_class = NULL;
+
+static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
+
+GType
+gst_multiudpsink_get_type (void)
+{
+  static GType multiudpsink_type = 0;
+
+  if (!multiudpsink_type) {
+    static const GTypeInfo multiudpsink_info = {
+      sizeof (GstMultiUDPSinkClass),
+      gst_multiudpsink_base_init,
+      NULL,
+      (GClassInitFunc) gst_multiudpsink_class_init,
+      NULL,
+      NULL,
+      sizeof (GstMultiUDPSink),
+      0,
+      (GInstanceInitFunc) gst_multiudpsink_init,
+      NULL
+    };
+
+    multiudpsink_type =
+        g_type_register_static (GST_TYPE_BASESINK, "GstMultiUDPSink",
+        &multiudpsink_info, 0);
+  }
+  return multiudpsink_type;
+}
+
+static void
+gst_multiudpsink_base_init (gpointer g_class)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&sink_template));
+
+  gst_element_class_set_details (element_class, &gst_multiudpsink_details);
+}
+
+static void
+gst_multiudpsink_class_init (GstMultiUDPSink * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+  GstBaseSinkClass *gstbasesink_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+  gstbasesink_class = (GstBaseSinkClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_BASESINK);
+
+  gobject_class->set_property = gst_multiudpsink_set_property;
+  gobject_class->get_property = gst_multiudpsink_get_property;
+  gobject_class->finalize = gst_multiudpsink_finalize;
+
+  gst_multiudpsink_signals[SIGNAL_ADD] =
+      g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
+      NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
+      G_TYPE_STRING, G_TYPE_INT);
+  gst_multiudpsink_signals[SIGNAL_REMOVE] =
+      g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
+      NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
+      G_TYPE_STRING, G_TYPE_INT);
+  gst_multiudpsink_signals[SIGNAL_CLEAR] =
+      g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
+      NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+  gst_multiudpsink_signals[SIGNAL_GET_STATS] =
+      g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+      G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
+      NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
+      G_TYPE_STRING, G_TYPE_INT);
+
+  gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
+      g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
+      NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
+      G_TYPE_STRING, G_TYPE_INT);
+  gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
+      g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
+          client_removed), NULL, NULL, gst_udp_marshal_VOID__STRING_INT,
+      G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
+
+  gstelement_class->change_state = gst_multiudpsink_change_state;
+
+  gstbasesink_class->get_times = gst_multiudpsink_get_times;
+  gstbasesink_class->render = gst_multiudpsink_render;
+
+  GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
+}
+
+
+static void
+gst_multiudpsink_init (GstMultiUDPSink * sink)
+{
+  sink->client_lock = g_mutex_new ();
+}
+
+static void
+gst_multiudpsink_finalize (GObject * object)
+{
+  GstMultiUDPSink *sink;
+
+  sink = GST_MULTIUDPSINK (object);
+
+  g_mutex_free (sink->client_lock);
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_multiudpsink_get_times (GstBaseSink * sink, GstBuffer * buffer,
+    GstClockTime * start, GstClockTime * end)
+{
+  *start = GST_BUFFER_TIMESTAMP (buffer);
+  *end = *start + GST_BUFFER_DURATION (buffer);
+}
+
+static GstFlowReturn
+gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
+{
+  GstMultiUDPSink *sink;
+  gint ret, size;
+  guint8 *data;
+  GList *clients;
+
+  sink = GST_MULTIUDPSINK (bsink);
+
+  size = GST_BUFFER_SIZE (buffer);
+  data = GST_BUFFER_DATA (buffer);
+
+  GST_DEBUG ("about to send %d bytes", size);
+
+  g_mutex_lock (sink->client_lock);
+  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+    GstUDPClient *client;
+
+    client = (GstUDPClient *) clients->data;
+    GST_DEBUG ("sending %d bytes to client %p", size, client);
+
+    while (TRUE) {
+      ret = sendto (*client->sock, data, size, 0,
+          (struct sockaddr *) &client->theiraddr, sizeof (client->theiraddr));
+
+      if (ret < 0) {
+        if (errno != EINTR && errno != EAGAIN) {
+          goto send_error;
+        }
+      } else
+        break;
+    }
+  }
+  g_mutex_unlock (sink->client_lock);
+
+  GST_DEBUG ("sent %d bytes", size);
+
+  return GST_FLOW_OK;
+
+send_error:
+  {
+    g_mutex_unlock (sink->client_lock);
+    GST_DEBUG ("got send error %s (%d)", g_strerror (errno), errno);
+    return GST_FLOW_ERROR;
+  }
+}
+
+static void
+gst_multiudpsink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstMultiUDPSink *udpsink;
+
+  udpsink = GST_MULTIUDPSINK (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
+    GParamSpec * pspec)
+{
+  GstMultiUDPSink *udpsink;
+
+  udpsink = GST_MULTIUDPSINK (object);
+
+  switch (prop_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+
+/* create a socket for sending to remote machine */
+static gboolean
+gst_multiudpsink_init_send (GstMultiUDPSink * sink)
+{
+  guint bc_val;
+  gint ret;
+
+  /* create sender socket */
+  if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1)
+    goto no_socket;
+
+  bc_val = 1;
+  if ((ret =
+          setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
+              sizeof (bc_val))) < 0)
+    goto no_broadcast;
+
+  return TRUE;
+
+  /* ERRORS */
+no_socket:
+  {
+    perror ("socket");
+    return FALSE;
+  }
+no_broadcast:
+  {
+    perror ("setsockopt");
+    return FALSE;
+  }
+}
+
+static void
+gst_multiudpsink_close (GstMultiUDPSink * sink)
+{
+  close (sink->sock);
+}
+
+void
+gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
+{
+  struct hostent *he;
+  struct in_addr addr;
+  struct ip_mreq multi_addr;
+  GstUDPClient *client;
+
+  client = g_new0 (GstUDPClient, 1);
+  client->host = g_strdup (host);
+  client->port = port;
+  client->sock = &sink->sock;
+
+  memset (&client->theiraddr, 0, sizeof (client->theiraddr));
+  client->theiraddr.sin_family = AF_INET;       /* host byte order */
+  client->theiraddr.sin_port = htons (port);    /* short, network byte order */
+
+  /* if its an IP address */
+  if (inet_aton (host, &addr)) {
+    /* check if its a multicast address */
+    if ((ntohl (addr.s_addr) & 0xe0000000) == 0xe0000000) {
+      client->multi_addr.imr_multiaddr.s_addr = addr.s_addr;
+      client->multi_addr.imr_interface.s_addr = INADDR_ANY;
+
+      client->theiraddr.sin_addr = multi_addr.imr_multiaddr;
+
+      /* Joining the multicast group */
+      /* FIXME, can we use multicast and unicast over the same
+       * socket? if not, search for socket of this multicast group or
+       * create a new one. */
+      setsockopt (sink->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &multi_addr,
+          sizeof (multi_addr));
+    } else {
+      client->theiraddr.sin_addr = *((struct in_addr *) &addr);
+    }
+  }
+  /* we dont need to lookup for localhost */
+  else if (strcmp (host, "localhost") == 0 && inet_aton ("127.0.0.1", &addr)) {
+    client->theiraddr.sin_addr = *((struct in_addr *) &addr);
+  }
+  /* if its a hostname */
+  else if ((he = gethostbyname (host))) {
+    client->theiraddr.sin_addr = *((struct in_addr *) he->h_addr);
+  } else {
+    goto host_error;
+  }
+
+  g_mutex_lock (sink->client_lock);
+  sink->clients = g_list_prepend (sink->clients, client);
+  g_mutex_unlock (sink->client_lock);
+
+  return;
+
+  /* ERRORS */
+host_error:
+  {
+    GST_DEBUG ("hostname lookup error?");
+    g_free (client->host);
+    g_free (client);
+    return;
+  }
+}
+
+void
+gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
+{
+}
+
+void
+gst_multiudpsink_clear (GstMultiUDPSink * sink)
+{
+}
+
+GValueArray *
+gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
+    gint port)
+{
+  return NULL;
+}
+
+static GstElementStateReturn
+gst_multiudpsink_change_state (GstElement * element)
+{
+  GstElementStateReturn ret;
+  GstMultiUDPSink *sink;
+  gint transition;
+
+  sink = GST_MULTIUDPSINK (element);
+  transition = GST_STATE_TRANSITION (element);
+
+  switch (transition) {
+    case GST_STATE_READY_TO_PAUSED:
+      if (!gst_multiudpsink_init_send (sink))
+        goto no_init;
+      break;
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+  switch (transition) {
+    case GST_STATE_PAUSED_TO_READY:
+      gst_multiudpsink_close (sink);
+      break;
+    default:
+      break;
+  }
+  return ret;
+
+  /* ERRORS */
+no_init:
+  {
+    return GST_STATE_FAILURE;
+  }
+}
diff --git a/gst/udp/gstmultiudpsink.h b/gst/udp/gstmultiudpsink.h
new file mode 100644 (file)
index 0000000..c5b22dd
--- /dev/null
@@ -0,0 +1,96 @@
+/* GStreamer
+ * Copyright (C) <2005> Wim Taymand <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_MULTIUDPSINK_H__
+#define __GST_MULTIUDPSINK_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+
+G_BEGIN_DECLS
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <fcntl.h>
+#include <arpa/inet.h>
+#include "gstudp.h"
+
+#define GST_TYPE_MULTIUDPSINK          (gst_multiudpsink_get_type())
+#define GST_MULTIUDPSINK(obj)          (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTIUDPSINK,GstMultiUDPSink))
+#define GST_MULTIUDPSINK_CLASS(klass)  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTIUDPSINK,GstMultiUDPSink))
+#define GST_IS_MULTIUDPSINK(obj)       (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTIUDPSINK))
+#define GST_IS_MULTIUDPSINK_CLASS(obj)         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTIUDPSINK))
+
+typedef struct _GstMultiUDPSink GstMultiUDPSink;
+typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass;
+
+typedef struct {
+  int *sock;
+
+  struct sockaddr_in theiraddr;
+  struct ip_mreq multi_addr;
+
+  gchar *host;
+  gint port;
+} GstUDPClient;
+
+/* sends udp packets to multiple host/port pairs.
+ */
+struct _GstMultiUDPSink {
+  GstBaseSink parent;
+
+  int sock;
+
+  GMutex       *client_lock;
+  GList                *clients;
+};
+
+struct _GstMultiUDPSinkClass {
+  GstBaseSinkClass parent_class;
+
+  /* element methods */
+  void          (*add)          (GstMultiUDPSink *sink, const gchar *host, gint port);
+  void          (*remove)       (GstMultiUDPSink *sink, const gchar *host, gint port);
+  void          (*clear)        (GstMultiUDPSink *sink);
+  GValueArray*  (*get_stats)    (GstMultiUDPSink *sink, const gchar *host, gint port);
+
+  /* signals */
+  void                 (*client_added) (GstElement *element, const gchar *host, gint port);
+  void                 (*client_removed) (GstElement *element, const gchar *host, gint port);
+};
+
+GType gst_multiudpsink_get_type(void);
+
+void           gst_multiudpsink_add            (GstMultiUDPSink *sink, const gchar *host, gint port);
+void           gst_multiudpsink_remove         (GstMultiUDPSink *sink, const gchar *host, gint port);
+void           gst_multiudpsink_clear          (GstMultiUDPSink *sink);
+GValueArray*   gst_multiudpsink_get_stats      (GstMultiUDPSink *sink, const gchar *host, gint port);
+
+G_END_DECLS
+
+#endif /* __GST_MULTIUDPSINK_H__ */
diff --git a/gst/udp/gstudp-marshal.list b/gst/udp/gstudp-marshal.list
new file mode 100644 (file)
index 0000000..b53e79c
--- /dev/null
@@ -0,0 +1,2 @@
+VOID:STRING,INT
+BOXED:STRING,INT
index 5d67e43..e99c245 100644 (file)
@@ -22,6 +22,7 @@
 #endif
 
 #include "gstudpsrc.h"
+#include "gstmultiudpsink.h"
 #include "gstudpsink.h"
 
 static gboolean
@@ -31,6 +32,10 @@ plugin_init (GstPlugin * plugin)
           GST_TYPE_UDPSINK))
     return FALSE;
 
+  if (!gst_element_register (plugin, "multiudpsink", GST_RANK_NONE,
+          GST_TYPE_MULTIUDPSINK))
+    return FALSE;
+
   if (!gst_element_register (plugin, "udpsrc", GST_RANK_NONE, GST_TYPE_UDPSRC))
     return FALSE;
 
index 89103e9..f687937 100644 (file)
  * Boston, MA 02111-1307, USA.
  */
 
+#include "gstudp-enumtypes.h"
+#include <glib.h>
 
 #ifndef __GST_UDP_H__
 #define __GST_UDP_H__
 
-#ifdef __cplusplus
-extern "C"
-{
-#endif                         /* __cplusplus */
+G_BEGIN_DECLS
 
-  typedef enum
-  {
-    CONTROL_ZERO,
-    CONTROL_NONE,
-    CONTROL_UDP,
-    CONTROL_TCP
-  } Gst_UDP_Control;
+typedef enum
+{
+  CONTROL_ZERO,
+  CONTROL_NONE,
+  CONTROL_UDP,
+  CONTROL_TCP
+} GstUDPControl;
 
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_END_DECLS
 
 #endif /* __GST_UDP_H__ */
 
index 0d57cd6..56f2753 100644 (file)
@@ -1,5 +1,5 @@
 /* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -17,7 +17,6 @@
  * Boston, MA 02111-1307, USA.
  */
 
-
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 #define UDP_DEFAULT_HOST       "localhost"
 #define UDP_DEFAULT_PORT       4951
 
-static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
-    GST_PAD_SINK,
-    GST_PAD_ALWAYS,
-    GST_STATIC_CAPS_ANY);
-
 /* elementfactory information */
 static GstElementDetails gst_udpsink_details =
 GST_ELEMENT_DETAILS ("UDP packet sender",
     "Sink/Network",
     "Send data over the network via UDP",
-    "Wim Taymans <wim.taymans@chello.be>");
+    "Wim Taymans <wim@fluendo.com>");
 
 /* UDPSink signals and args */
 enum
@@ -47,9 +41,10 @@ enum
 
 enum
 {
-  ARG_0,
-  ARG_HOST,
-  ARG_PORT,
+  PROP_0,
+  PROP_HOST,
+  PROP_PORT,
+  PROP_URI,
   /* FILL ME */
 };
 
@@ -57,12 +52,6 @@ static void gst_udpsink_base_init (gpointer g_class);
 static void gst_udpsink_class_init (GstUDPSink * klass);
 static void gst_udpsink_init (GstUDPSink * udpsink);
 
-static void gst_udpsink_get_times (GstBaseSink * sink, GstBuffer * buffer,
-    GstClockTime * start, GstClockTime * end);
-static GstFlowReturn gst_udpsink_render (GstBaseSink * sink,
-    GstBuffer * buffer);
-static GstElementStateReturn gst_udpsink_change_state (GstElement * element);
-
 static void gst_udpsink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_udpsink_get_property (GObject * object, guint prop_id,
@@ -92,8 +81,8 @@ gst_udpsink_get_type (void)
     };
 
     udpsink_type =
-        g_type_register_static (GST_TYPE_BASESINK, "GstUDPSink", &udpsink_info,
-        0);
+        g_type_register_static (GST_TYPE_MULTIUDPSINK, "GstUDPSink",
+        &udpsink_info, 0);
   }
   return udpsink_type;
 }
@@ -103,9 +92,6 @@ gst_udpsink_base_init (gpointer g_class)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&sink_template));
-
   gst_element_class_set_details (element_class, &gst_udpsink_details);
 }
 
@@ -120,23 +106,18 @@ gst_udpsink_class_init (GstUDPSink * klass)
   gstelement_class = (GstElementClass *) klass;
   gstbasesink_class = (GstBaseSinkClass *) klass;
 
-  parent_class = g_type_class_ref (GST_TYPE_BASESINK);
+  parent_class = g_type_class_ref (GST_TYPE_MULTIUDPSINK);
 
   gobject_class->set_property = gst_udpsink_set_property;
   gobject_class->get_property = gst_udpsink_get_property;
 
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HOST,
       g_param_spec_string ("host", "host",
           "The host/IP/Multicast group to send the packets to",
           UDP_DEFAULT_HOST, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
       g_param_spec_int ("port", "port", "The port to send the packets to",
           0, 32768, UDP_DEFAULT_PORT, G_PARAM_READWRITE));
-
-  gstelement_class->change_state = gst_udpsink_change_state;
-
-  gstbasesink_class->get_times = gst_udpsink_get_times;
-  gstbasesink_class->render = gst_udpsink_render;
 }
 
 
@@ -145,45 +126,14 @@ gst_udpsink_init (GstUDPSink * udpsink)
 {
   udpsink->host = g_strdup (UDP_DEFAULT_HOST);
   udpsink->port = UDP_DEFAULT_PORT;
+  gst_multiudpsink_add (GST_MULTIUDPSINK (udpsink), udpsink->host,
+      udpsink->port);
 }
 
-static void
-gst_udpsink_get_times (GstBaseSink * sink, GstBuffer * buffer,
-    GstClockTime * start, GstClockTime * end)
+static gboolean
+gst_udpsink_set_uri (GstUDPSink * sink, const gchar * uri)
 {
-  *start = GST_BUFFER_TIMESTAMP (buffer);
-  *end = *start + GST_BUFFER_DURATION (buffer);
-}
-
-static GstFlowReturn
-gst_udpsink_render (GstBaseSink * sink, GstBuffer * buffer)
-{
-  GstUDPSink *udpsink;
-  gint ret, size;
-  guint8 *data;
-
-  udpsink = GST_UDPSINK (sink);
-
-  size = GST_BUFFER_SIZE (buffer);
-  data = GST_BUFFER_DATA (buffer);
-
-  while (TRUE) {
-    ret = sendto (udpsink->sock, data, size, 0,
-        (struct sockaddr *) &udpsink->theiraddr, sizeof (udpsink->theiraddr));
-
-    if (ret < 0) {
-      if (errno != EINTR && errno != EAGAIN)
-        goto send_error;
-    } else
-      break;
-  }
-  return GST_FLOW_OK;
-
-send_error:
-  {
-    GST_DEBUG ("got send error");
-    return GST_FLOW_ERROR;
-  }
+  return FALSE;
 }
 
 static void
@@ -194,21 +144,25 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value,
 
   udpsink = GST_UDPSINK (object);
 
+  /* remove old host */
+  gst_multiudpsink_remove (GST_MULTIUDPSINK (udpsink),
+      udpsink->host, udpsink->port);
+
   switch (prop_id) {
-    case ARG_HOST:
-      if (udpsink->host != NULL)
-        g_free (udpsink->host);
-      if (g_value_get_string (value) == NULL)
-        udpsink->host = NULL;
-      else
-        udpsink->host = g_strdup (g_value_get_string (value));
+    case PROP_HOST:
+      g_free (udpsink->host);
+      udpsink->host = g_value_dup_string (value);
       break;
-    case ARG_PORT:
+    case PROP_PORT:
       udpsink->port = g_value_get_int (value);
       break;
     default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
+  /* add new host */
+  gst_multiudpsink_add (GST_MULTIUDPSINK (udpsink),
+      udpsink->host, udpsink->port);
 }
 
 static void
@@ -220,10 +174,10 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value,
   udpsink = GST_UDPSINK (object);
 
   switch (prop_id) {
-    case ARG_HOST:
+    case PROP_HOST:
       g_value_set_string (value, udpsink->host);
       break;
-    case ARG_PORT:
+    case PROP_PORT:
       g_value_set_int (value, udpsink->port);
       break;
     default:
@@ -232,104 +186,47 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value,
   }
 }
 
+/*** GSTURIHANDLER INTERFACE *************************************************/
 
-/* create a socket for sending to remote machine */
-static gboolean
-gst_udpsink_init_send (GstUDPSink * sink)
+static guint
+gst_udpsink_uri_get_type (void)
 {
-  struct hostent *he;
-  struct in_addr addr;
-  guint bc_val;
-
-  memset (&sink->theiraddr, 0, sizeof (sink->theiraddr));
-  sink->theiraddr.sin_family = AF_INET; /* host byte order */
-  sink->theiraddr.sin_port = htons (sink->port);        /* short, network byte order */
-
-  /* if its an IP address */
-  if (inet_aton (sink->host, &addr)) {
-    /* check if its a multicast address */
-    if ((ntohl (addr.s_addr) & 0xe0000000) == 0xe0000000) {
-      sink->multi_addr.imr_multiaddr.s_addr = addr.s_addr;
-      sink->multi_addr.imr_interface.s_addr = INADDR_ANY;
-
-      sink->theiraddr.sin_addr = sink->multi_addr.imr_multiaddr;
-
-      /* Joining the multicast group */
-      setsockopt (sink->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &sink->multi_addr,
-          sizeof (sink->multi_addr));
-    }
-
-    else {
-      sink->theiraddr.sin_addr = *((struct in_addr *) &addr);
-    }
-  }
-
-  /* we dont need to lookup for localhost */
-  else if (strcmp (sink->host, UDP_DEFAULT_HOST) == 0 &&
-      inet_aton ("127.0.0.1", &addr)) {
-    sink->theiraddr.sin_addr = *((struct in_addr *) &addr);
-  }
-
-  /* if its a hostname */
-  else if ((he = gethostbyname (sink->host))) {
-    sink->theiraddr.sin_addr = *((struct in_addr *) he->h_addr);
-  }
-
-  else {
-    perror ("hostname lookup error?");
-    return FALSE;
-  }
-
-  if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) {
-    perror ("socket");
-    return FALSE;
-  }
-
-  bc_val = 1;
-  setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val));
-
-  return TRUE;
+  return GST_URI_SINK;
 }
-
-static void
-gst_udpsink_close (GstUDPSink * sink)
+static gchar **
+gst_udpsink_uri_get_protocols (void)
 {
-  close (sink->sock);
+  static gchar *protocols[] = { "udp", NULL };
+
+  return protocols;
 }
 
-static GstElementStateReturn
-gst_udpsink_change_state (GstElement * element)
+static const gchar *
+gst_udpsink_uri_get_uri (GstURIHandler * handler)
 {
-  GstElementStateReturn ret;
-  GstUDPSink *sink;
-  gint transition;
-
-  sink = GST_UDPSINK (element);
-  transition = GST_STATE_TRANSITION (element);
+  GstUDPSink *sink = GST_UDPSINK (handler);
 
-  switch (transition) {
-    case GST_STATE_READY_TO_PAUSED:
-      if (!gst_udpsink_init_send (sink))
-        goto no_init;
-      break;
-    default:
-      break;
-  }
+  return g_strdup (sink->uri);
+}
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
+static gboolean
+gst_udpsink_uri_set_uri (GstURIHandler * handler, const gchar * uri)
+{
+  gboolean ret;
+  GstUDPSink *sink = GST_UDPSINK (handler);
 
-  switch (transition) {
-    case GST_STATE_PAUSED_TO_READY:
-      gst_udpsink_close (sink);
-      break;
-    default:
-      break;
-  }
+  ret = gst_udpsink_set_uri (sink, uri);
 
   return ret;
+}
 
-no_init:
-  {
-    return GST_STATE_FAILURE;
-  }
+static void
+gst_udpsink_uri_handler_init (gpointer g_iface, gpointer iface_data)
+{
+  GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+  iface->get_type = gst_udpsink_uri_get_type;
+  iface->get_protocols = gst_udpsink_uri_get_protocols;
+  iface->get_uri = gst_udpsink_uri_get_uri;
+  iface->set_uri = gst_udpsink_uri_set_uri;
 }
index e5480c3..57abcca 100644 (file)
@@ -22,7 +22,7 @@
 #define __GST_UDPSINK_H__
 
 #include <gst/gst.h>
-#include <gst/base/gstbasesink.h>
+#include "gstmultiudpsink.h"
 
 G_BEGIN_DECLS
 
@@ -51,18 +51,15 @@ typedef struct _GstUDPSink GstUDPSink;
 typedef struct _GstUDPSinkClass GstUDPSinkClass;
 
 struct _GstUDPSink {
-  GstBaseSink parent;
-
-  int sock;
-  struct sockaddr_in theiraddr;
-  struct ip_mreq multi_addr;
+  GstMultiUDPSink parent;
 
+  gchar *uri;
   gint port;
   gchar *host;
 };
 
 struct _GstUDPSinkClass {
-  GstBaseSinkClass parent_class;
+  GstMultiUDPSinkClass parent_class;
 };
 
 GType gst_udpsink_get_type(void);
index 76c4b73..ffec6a0 100644 (file)
@@ -1,5 +1,5 @@
 /* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 
 #include "gstudpsrc.h"
 #include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_FIONREAD_IN_SYS_FILIO
+#include <sys/filio.h>
+#endif
+
+GST_DEBUG_CATEGORY (udpsrc_debug);
+#define GST_CAT_DEFAULT (udpsrc_debug)
+
+/* the select call is also performed on the control sockets, that way
+ * we can send special commands to unblock or restart the select call */
+#define CONTROL_RESTART        'R'      /* restart the select call */
+#define CONTROL_STOP           'S'      /* stop the select call */
+#define CONTROL_SOCKETS(src)   src->control_sock
+#define WRITE_SOCKET(src)      src->control_sock[1]
+#define READ_SOCKET(src)       src->control_sock[0]
+
+#define SEND_COMMAND(src, command)                     \
+G_STMT_START {                                         \
+  unsigned char c; c = command;                        \
+  write (WRITE_SOCKET(src), &c, 1);            \
+} G_STMT_END
+
+#define READ_COMMAND(src, command, res)                \
+G_STMT_START {                                 \
+  res = read(READ_SOCKET(src), &command, 1);    \
+} G_STMT_END
 
 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
@@ -35,7 +62,7 @@ static GstElementDetails gst_udpsrc_details =
 GST_ELEMENT_DETAILS ("UDP packet receiver",
     "Source/Network",
     "Receive data over the network via UDP",
-    "Wim Taymans <wim.taymans@chello.be>");
+    "Wim Taymans <wim@fluendo.com>");
 
 /* UDPSrc signals and args */
 enum
@@ -66,6 +93,7 @@ static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
 static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
 static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
 static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
+static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
 
 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -153,7 +181,11 @@ gst_udpsrc_class_init (GstUDPSrc * klass)
 
   gstbasesrc_class->start = gst_udpsrc_start;
   gstbasesrc_class->stop = gst_udpsrc_stop;
+  gstbasesrc_class->unlock = gst_udpsrc_unlock;
+
   gstpushsrc_class->create = gst_udpsrc_create;
+
+  GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
 }
 
 static void
@@ -172,30 +204,82 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
   GstBuffer *outbuf;
   struct sockaddr_in tmpaddr;
   socklen_t len;
-  gint numbytes;
   fd_set read_fds;
   guint max_sock;
   gchar *pktdata;
   gint pktsize;
+  gint readsize;
+  gint ret;
+  gboolean try_again;
 
   udpsrc = GST_UDPSRC (psrc);
 
   FD_ZERO (&read_fds);
   FD_SET (udpsrc->sock, &read_fds);
-  max_sock = udpsrc->sock;
+  FD_SET (READ_SOCKET (udpsrc), &read_fds);
+  max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
 
-  /* FIXME, add another socket to unblock */
-  if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0)
-    goto select_error;
+  do {
+    gboolean stop;
 
-  pktdata = g_malloc (24000);
-  pktsize = 24000;
+    try_again = FALSE;
+    stop = FALSE;
+
+    GST_LOG_OBJECT (udpsrc, "doing select");
+    ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL);
+    GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
+    if (ret <= 0) {
+      if (errno != EAGAIN && errno != EINTR)
+        goto select_error;
+      else
+        try_again = TRUE;
+    } else {
+      /* got control message */
+      if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds)) {
+        while (TRUE) {
+          gchar command;
+          int res;
+
+          READ_COMMAND (udpsrc, command, res);
+          if (res < 0) {
+            GST_LOG_OBJECT (udpsrc, "no more commands");
+            /* no more commands */
+            break;
+          }
+
+          switch (command) {
+            case CONTROL_STOP:
+              /* break out of the select loop */
+              GST_LOG_OBJECT (udpsrc, "stop");
+              /* stop this function */
+              stop = TRUE;
+              break;
+            default:
+              GST_WARNING_OBJECT (udpsrc, "unkown");
+              g_warning ("multiudpsink: unknown control message received");
+              break;
+          }
+        }
+      }
+    }
+    if (stop)
+      goto stopped;
+  } while (try_again);
+
+  /* ask how much is available for reading on the socket */
+  if ((ret = ioctl (udpsrc->sock, FIONREAD, &readsize)) < 0)
+    goto ioctl_failed;
+
+  GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", readsize);
+
+  pktdata = g_malloc (readsize);
+  pktsize = readsize;
 
   len = sizeof (struct sockaddr);
   while (TRUE) {
-    numbytes = recvfrom (udpsrc->sock, pktdata, pktsize,
+    ret = recvfrom (udpsrc->sock, pktdata, pktsize,
         0, (struct sockaddr *) &tmpaddr, &len);
-    if (numbytes < 0) {
+    if (ret < 0) {
       if (errno != EAGAIN && errno != EINTR)
         goto receive_error;
     } else
@@ -204,7 +288,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 
   outbuf = gst_buffer_new ();
   GST_BUFFER_DATA (outbuf) = pktdata;
-  GST_BUFFER_SIZE (outbuf) = numbytes;
+  GST_BUFFER_SIZE (outbuf) = ret;
 
   *buf = outbuf;
 
@@ -212,13 +296,26 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 
 select_error:
   {
-    GST_DEBUG ("got select error");
+    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+        ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
+    return GST_FLOW_ERROR;
+  }
+stopped:
+  {
+    GST_DEBUG ("stop called");
+    return GST_FLOW_WRONG_STATE;
+  }
+ioctl_failed:
+  {
+    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+        ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
     return GST_FLOW_ERROR;
   }
 receive_error:
   {
-    gst_buffer_unref (outbuf);
-    GST_DEBUG ("got receive error");
+    g_free (pktdata);
+    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+        ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
     return GST_FLOW_ERROR;
   }
 }
@@ -250,7 +347,8 @@ gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
 wrong_protocol:
   {
     g_free (protocol);
-    GST_DEBUG ("error parsing uri %s", uri);
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("error parsing uri %s: wrong protocol", uri));
     return FALSE;
   }
 }
@@ -313,39 +411,53 @@ static gboolean
 gst_udpsrc_start (GstBaseSrc * bsrc)
 {
   guint bc_val;
-  gint reuse = 1;
+  gint reuse;
   struct sockaddr_in my_addr;
   int len, port;
   GstUDPSrc *src;
+  gint ret;
 
   src = GST_UDPSRC (bsrc);
 
-  memset (&src->myaddr, 0, sizeof (src->myaddr));
-  src->myaddr.sin_family = AF_INET;     /* host byte order */
-  src->myaddr.sin_port = htons (src->port);     /* short, network byte order */
-  src->myaddr.sin_addr.s_addr = INADDR_ANY;
+  if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0)
+    goto no_socket_pair;
 
-  if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
+  fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
+  fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
+
+  if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
     goto no_socket;
 
-  if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
-          sizeof (reuse)) < 0)
+  src->sock = ret;
+
+  reuse = 1;
+  if ((ret =
+          setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
+              sizeof (reuse))) < 0)
     goto setsockopt_error;
 
-  if (bind (src->sock, (struct sockaddr *) &src->myaddr,
-          sizeof (src->myaddr)) < 0)
+  memset (&src->myaddr, 0, sizeof (src->myaddr));
+  src->myaddr.sin_family = AF_INET;     /* host byte order */
+  src->myaddr.sin_port = htons (src->port);     /* short, network byte order */
+  src->myaddr.sin_addr.s_addr = INADDR_ANY;
+
+  if ((ret =
+          bind (src->sock, (struct sockaddr *) &src->myaddr,
+              sizeof (src->myaddr))) < 0)
     goto bind_error;
 
   if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) {
     if (src->multi_addr.imr_multiaddr.s_addr) {
       src->multi_addr.imr_interface.s_addr = INADDR_ANY;
-      setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &src->multi_addr,
-          sizeof (src->multi_addr));
+      if ((ret =
+              setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+                  &src->multi_addr, sizeof (src->multi_addr))) < 0)
+        goto membership;
     }
   }
 
   len = sizeof (my_addr);
-  if (getsockname (src->sock, (struct sockaddr *) &my_addr, &len) < 0)
+  if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0)
     goto getsockname_error;
 
   port = ntohs (my_addr.sin_port);
@@ -355,35 +467,85 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
   }
 
   bc_val = 1;
-  setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val));
+  if ((ret =
+          setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
+              sizeof (bc_val))) < 0)
+    goto no_broadcast;
+
   src->myaddr.sin_port = htons (src->port + 1);
 
   return TRUE;
 
   /* ERRORS */
+no_socket_pair:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+        ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno));
+    return FALSE;
+  }
 no_socket:
   {
-    GST_DEBUG ("no_socket");
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+        ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
     return FALSE;
   }
 setsockopt_error:
   {
-    GST_DEBUG ("setsockopt failed");
+    close (src->sock);
+    src->sock = -1;
+    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+        ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
     return FALSE;
   }
 bind_error:
   {
-    GST_DEBUG ("bind failed");
+    close (src->sock);
+    src->sock = -1;
+    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+        ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
+    return FALSE;
+  }
+membership:
+  {
+    close (src->sock);
+    src->sock = -1;
+    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+        ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
     return FALSE;
   }
 getsockname_error:
   {
-    GST_DEBUG ("getsockname failed");
+    close (src->sock);
+    src->sock = -1;
+    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+        ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
+    return FALSE;
+  }
+no_broadcast:
+  {
+    close (src->sock);
+    src->sock = -1;
+    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+        ("could not configure socket for broadcast %d: %s (%d)", ret,
+            g_strerror (errno), errno));
     return FALSE;
   }
 }
 
 static gboolean
+gst_udpsrc_unlock (GstBaseSrc * bsrc)
+{
+  GstUDPSrc *src;
+
+  src = GST_UDPSRC (bsrc);
+
+  GST_DEBUG ("sending stop command");
+  SEND_COMMAND (src, CONTROL_STOP);
+
+  return TRUE;
+}
+
+static gboolean
 gst_udpsrc_stop (GstBaseSrc * bsrc)
 {
   GstUDPSrc *src;
index c39cbe7..cbdfa7c 100644 (file)
@@ -56,12 +56,12 @@ struct _GstUDPSrc {
 
   gchar *uri;
   int port;
-  int sock;
+
   gchar *multi_group;
-  gboolean multicast;
   gint ttl;
 
-  int control;
+  int sock;
+  int control_sock[2];
 
   struct sockaddr_in myaddr;
   struct ip_mreq multi_addr;