gst/udp/gstudpsrc.*: Add property to control automatic join/leave of multicast groups.
authorWim Taymans <wim.taymans@gmail.com>
Fri, 13 Jun 2008 11:54:05 +0000 (11:54 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Fri, 13 Jun 2008 11:54:05 +0000 (11:54 +0000)
Original commit message from CVS:
* gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
(gst_udpsrc_create), (gst_udpsrc_set_property),
(gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Add property to control automatic join/leave of multicast groups.
Add G_LIKELY.
Remove setting caps on buffers explicitly, basesrc does that for us now.
Improve debug info.
Convert some non-fatal error into warnings.
Use g_ntohs for better portability.
Leave multicast groups when stopping.
When using external sockets, use getsockname() on them to fill up the
addr structure before calling methods that use the structure.
Should all fix #536903.
API: GstUDPSrc::auto-multicast property

ChangeLog
gst/udp/gstudpsrc.c
gst/udp/gstudpsrc.h

index cdb2dddf69755e83912957924db7532fb39fb0c9..7929a211abaf5e1e7c5817ac0551e5ef6eaf1ae5 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,21 @@
+2008-06-13  Wim Taymans  <wim.taymans@collabora.co.uk>
+
+       * gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
+       (gst_udpsrc_create), (gst_udpsrc_set_property),
+       (gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop):
+       * gst/udp/gstudpsrc.h:
+       Add property to control automatic join/leave of multicast groups.
+       Add G_LIKELY.
+       Remove setting caps on buffers explicitly, basesrc does that for us now.
+       Improve debug info.
+       Convert some non-fatal error into warnings.
+       Use g_ntohs for better portability.
+       Leave multicast groups when stopping.
+       When using external sockets, use getsockname() on them to fill up the
+       addr structure before calling methods that use the structure.
+       Should all fix #536903.
+       API: GstUDPSrc::auto-multicast property
+
 2008-06-13  Wim Taymans  <wim.taymans@collabora.co.uk>
 
        * gst/udp/gstudpnetutils.c: (gst_udp_is_multicast):
index 40d91537e9abd1a5d715ddc675aebba30e01162a..a9341344487e67ddbab180b085647e8684b9bf97 100644 (file)
@@ -170,10 +170,12 @@ GST_ELEMENT_DETAILS ("UDP packet receiver",
 #define UDP_DEFAULT_SKIP_FIRST_BYTES   0
 #define UDP_DEFAULT_CLOSEFD            TRUE
 #define UDP_DEFAULT_SOCK                -1
+#define UDP_DEFAULT_AUTO_MULTICAST     TRUE
 
 enum
 {
   PROP_0,
+
   PROP_PORT,
   PROP_MULTICAST_GROUP,
   PROP_URI,
@@ -183,7 +185,10 @@ enum
   PROP_TIMEOUT,
   PROP_SKIP_FIRST_BYTES,
   PROP_CLOSEFD,
-  PROP_SOCK
+  PROP_SOCK,
+  PROP_AUTO_MULTICAST,
+
+  PROP_LAST
 };
 
 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
@@ -283,6 +288,10 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
       g_param_spec_int ("sock", "Socket Handle",
           "Socket currently in use for UDP reception. (-1 = no socket)",
           -1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
+  g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
+      g_param_spec_boolean ("auto-multicast", "Auto Multicast",
+          "Automatically join/leave multicast groups",
+          UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
 
   gstbasesrc_class->start = gst_udpsrc_start;
   gstbasesrc_class->stop = gst_udpsrc_stop;
@@ -298,7 +307,6 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
 {
   WSA_STARTUP (udpsrc);
 
-  gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
   udpsrc->port = UDP_DEFAULT_PORT;
   udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
   udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
@@ -308,9 +316,15 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
   udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
   udpsrc->externalfd = (udpsrc->sockfd != -1);
-
+  udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
   udpsrc->sock.fd = UDP_DEFAULT_SOCK;
+
+  /* configure basesrc to be a live source */
+  gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
+  /* make basesrc output a segment in time */
   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
+  /* make basesrc set timestamps on outgoing buffers based on the running_time
+   * when they were captured */
   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
 }
 
@@ -363,12 +377,13 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
   gint ret;
   gboolean try_again;
 
-  udpsrc = GST_UDPSRC (psrc);
+  udpsrc = GST_UDPSRC_CAST (psrc);
 
 retry:
   /* quick check, avoid going in select when we already have data */
   readsize = 0;
-  if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
+  if (G_UNLIKELY ((ret =
+              IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
     goto ioctl_failed;
 
   if (readsize > 0)
@@ -388,7 +403,7 @@ retry:
 
     ret = gst_poll_wait (udpsrc->fdset, timeout);
     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
-    if (ret < 0) {
+    if (G_UNLIKELY (ret < 0)) {
       if (errno == EBUSY)
         goto stopped;
 #ifdef G_OS_WIN32
@@ -399,7 +414,7 @@ retry:
         goto select_error;
 #endif
       try_again = TRUE;
-    } else if (ret == 0) {
+    } else if (G_UNLIKELY (ret == 0)) {
       /* timeout, post element message */
       gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
           gst_message_new_element (GST_OBJECT_CAST (udpsrc),
@@ -407,20 +422,21 @@ retry:
                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
       try_again = TRUE;
     }
-  } while (try_again);
+  } while (G_UNLIKELY (try_again));
 
   /* ask how much is available for reading on the socket, this should be exactly
    * one UDP packet. We will check the return value, though, because in some
    * case it can return 0 and we don't want a 0 sized buffer. */
   readsize = 0;
-  if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
+  if (G_UNLIKELY ((ret =
+              IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
     goto ioctl_failed;
 
   /* if we get here and there is nothing to read from the socket, the select got
-   * woken up by activity on the socket but it was not a read. We how someone
+   * woken up by activity on the socket but it was not a read. We know someone
    * will also do something with the socket so that we don't go into an infinite
    * loop in the select(). */
-  if (!readsize)
+  if (G_UNLIKELY (!readsize))
     goto retry;
 
 no_select:
@@ -433,7 +449,7 @@ no_select:
     len = sizeof (struct sockaddr);
     ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize,
         0, (struct sockaddr *) &tmpaddr, &len);
-    if (ret < 0) {
+    if (G_UNLIKELY (ret < 0)) {
 #ifdef G_OS_WIN32
       /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
        * generated a "port unreachable" ICMP response. We ignore that and try
@@ -458,7 +474,7 @@ no_select:
   GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
 
   /* patch pktdata and len when stripping off the headers */
-  if (udpsrc->skip_first_bytes != 0) {
+  if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
     if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
       goto skip_error;
 
@@ -490,9 +506,6 @@ no_select:
       errno = EAFNOSUPPORT;
       goto receive_error;
   }
-
-  gst_buffer_set_caps (GST_BUFFER_CAST (outbuf), udpsrc->caps);
-
   GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
 
   *buf = GST_BUFFER_CAST (outbuf);
@@ -647,6 +660,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
     case PROP_CLOSEFD:
       udpsrc->closefd = g_value_get_boolean (value);
       break;
+    case PROP_AUTO_MULTICAST:
+      udpsrc->auto_multicast = g_value_get_boolean (value);
+      break;
     default:
       break;
   }
@@ -689,6 +705,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_SOCK:
       g_value_set_int (value, udpsrc->sock.fd);
       break;
+    case PROP_AUTO_MULTICAST:
+      g_value_set_boolean (value, udpsrc->auto_multicast);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -701,20 +720,22 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
 {
   guint bc_val;
   gint reuse;
-  struct sockaddr_storage my_addr;
-  guint len;
   int port;
   GstUDPSrc *src;
   gint ret;
   int rcvsize;
+  guint len;
 
   src = GST_UDPSRC (bsrc);
 
   if (src->sockfd == -1) {
     /* need to allocate a socket */
+    GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->multi_group,
+        src->port);
     if ((ret =
             gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0)
       goto getaddrinfo_error;
+
     if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
       goto no_socket;
 
@@ -732,22 +753,18 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
                 sizeof (src->myaddr))) < 0)
       goto bind_error;
   } else {
-    /* we use the configured socket */
+    GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
+    /* we use the configured socket, try to get some info about it */
+    len = sizeof (src->myaddr);
+    if ((ret =
+            getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
+                &len)) < 0)
+      goto getsockname_error;
+
     src->sock.fd = src->sockfd;
     src->externalfd = TRUE;
   }
 
-  if (gst_udp_is_multicast (&src->myaddr)) {
-    ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
-    if (ret < 0)
-      goto membership;
-  }
-
-  len = sizeof (my_addr);
-  if ((ret =
-          getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0)
-    goto getsockname_error;
-
   len = sizeof (rcvsize);
   if (src->buffer_size != 0) {
     rcvsize = src->buffer_size;
@@ -759,8 +776,11 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
     ret =
         setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
         len);
-    if (ret != 0)
-      goto udpbuffer_error;
+    if (ret != 0) {
+      GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+          ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
+              rcvsize, ret, g_strerror (errno), errno));
+    }
   }
 
   /* read the value of the receive buffer. Note that on linux this returns 2x the
@@ -775,21 +795,29 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
 
   bc_val = 1;
   if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
-              sizeof (bc_val))) < 0)
-    goto no_broadcast;
+              sizeof (bc_val))) < 0) {
+    GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+        ("could not configure socket for broadcast %d: %s (%d)", ret,
+            g_strerror (errno), errno));
+  }
+
+  if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
+    GST_DEBUG_OBJECT (src, "joining multicast group %s", src->multi_group);
+    ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
+    if (ret < 0)
+      goto membership;
+  }
 
   /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
    * follows ss_family on both */
-  port = ntohs (((struct sockaddr_in *) &my_addr)->sin_port);
+  port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
   GST_DEBUG_OBJECT (src, "bound, on port %d", port);
   if (port != src->port) {
     src->port = port;
-    GST_DEBUG_OBJECT (src, "notifying %d", port);
+    GST_DEBUG_OBJECT (src, "notifying port %d", port);
     g_object_notify (G_OBJECT (src), "port");
   }
 
-  ((struct sockaddr_in *) &src->myaddr)->sin_port = htons (src->port + 1);
-
   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
     goto no_fdset;
 
@@ -839,22 +867,6 @@ getsockname_error:
         ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
     return FALSE;
   }
-udpbuffer_error:
-  {
-    CLOSE_IF_REQUESTED (src);
-    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("Could not create a buffer of the size requested, %d: %s (%d)", ret,
-            g_strerror (errno), errno));
-    return FALSE;
-  }
-no_broadcast:
-  {
-    CLOSE_IF_REQUESTED (src);
-    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
-        ("could not configure socket for broadcast %d: %s (%d)", ret,
-            g_strerror (errno), errno));
-    return FALSE;
-  }
 no_fdset:
   {
     CLOSE_IF_REQUESTED (src);
@@ -901,6 +913,10 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
   GST_DEBUG ("stopping, closing sockets");
 
   if (src->sock.fd >= 0) {
+    if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
+      GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->multi_group);
+      gst_udp_leave_group (src->sock.fd, &src->myaddr);
+    }
     CLOSE_IF_REQUESTED (src);
   }
 
index 597a6a708fc847bc3c014362cf788b22ea794b18..cdf7b35e39ab911abc28111ffa8926fa6225e991 100644 (file)
@@ -44,6 +44,7 @@ G_BEGIN_DECLS
   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSRC))
 #define GST_IS_UDPSRC_CLASS(klass) \
   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSRC))
+#define GST_UDPSRC_CAST(obj) ((GstUDPSrc *)(obj))
 
 typedef struct _GstUDPSrc GstUDPSrc;
 typedef struct _GstUDPSrcClass GstUDPSrcClass;
@@ -62,6 +63,7 @@ struct _GstUDPSrc {
   gint       skip_first_bytes;
   int        sockfd;
   gboolean   closefd;
+  gboolean   auto_multicast;
 
   /* our sockets */
   GstPollFD  sock;