gst/udp/gstudpsrc.*: Port to GstPoll. See #505417.
authorPeter Kjellerstedt <pkj@axis.com>
Thu, 28 Feb 2008 11:51:24 +0000 (11:51 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 28 Feb 2008 11:51:24 +0000 (11:51 +0000)
Original commit message from CVS:
Patch by: Peter Kjellerstedt <pkj at axis com>
* gst/udp/gstudpsrc.c: (gst_udpsrc_init), (gst_udpsrc_create),
(gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_unlock),
(gst_udpsrc_unlock_stop), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Port to GstPoll. See #505417.

ChangeLog
gst/udp/gstudpsrc.c
gst/udp/gstudpsrc.h

index da4876f..a527a63 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,13 @@
+2008-02-28  Wim Taymans  <wim.taymans@collabora.co.uk>
+
+       Patch by: Peter Kjellerstedt <pkj at axis com>
+
+       * gst/udp/gstudpsrc.c: (gst_udpsrc_init), (gst_udpsrc_create),
+       (gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_unlock),
+       (gst_udpsrc_unlock_stop), (gst_udpsrc_stop):
+       * gst/udp/gstudpsrc.h:
+       Port to GstPoll. See #505417.
+
 2008-02-28  Sebastian Dröge  <slomo@circular-chaos.org>
 
        * gst/law/mulaw-decode.c: (gst_mulawdec_chain):
index 8964f83..a727250 100644 (file)
@@ -141,29 +141,12 @@ typedef int socklen_t;
 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
 #define GST_CAT_DEFAULT (udpsrc_debug)
 
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock or restart the select call */
-#define CONTROL_RESTART        'R'      /* restart the select call */
-#define CONTROL_STOP           'S'      /* stop the select call */
-#define CONTROL_SOCKETS(src)   src->control_sock
-#define WRITE_SOCKET(src)      src->control_sock[1]
-#define READ_SOCKET(src)       src->control_sock[0]
-
-#define SEND_COMMAND(src, command, res)          \
-G_STMT_START {                                   \
-  unsigned char c; c = command;                  \
-  res = write (WRITE_SOCKET(src), &c, 1);        \
-} G_STMT_END
-
-#define READ_COMMAND(src, command, res)         \
-G_STMT_START {                                  \
-  res = read(READ_SOCKET(src), &command, 1);    \
-} G_STMT_END
-
 #define CLOSE_IF_REQUESTED(udpctx)                                        \
+G_STMT_START {                                                            \
   if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd))   \
-    CLOSE_SOCKET(udpctx->sock);                                           \
-  udpctx->sock = -1;
+    CLOSE_SOCKET(udpctx->sock.fd);                                        \
+  udpctx->sock.fd = -1;                                                   \
+} G_STMT_END
 
 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
@@ -326,9 +309,7 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
   udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
   udpsrc->externalfd = (udpsrc->sockfd != -1);
 
-  udpsrc->sock = UDP_DEFAULT_SOCK;
-  udpsrc->control_sock[0] = -1;
-  udpsrc->control_sock[1] = -1;
+  udpsrc->sock.fd = UDP_DEFAULT_SOCK;
   gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
   gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
 }
@@ -368,8 +349,6 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
   GstNetBuffer *outbuf;
   struct sockaddr_in tmpaddr;
   socklen_t len;
-  fd_set read_fds;
-  guint max_sock;
   guint8 *pktdata;
   gint pktsize;
 
@@ -378,6 +357,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 #elif defined G_OS_WIN32
   gulong readsize;
 #endif
+  GstClockTime timeout;
   gint ret;
   gboolean try_again;
 
@@ -386,49 +366,29 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
 retry:
   /* quick check, avoid going in select when we already have data */
   readsize = 0;
-  if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
+  if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
     goto ioctl_failed;
 
   if (readsize > 0)
     goto no_select;
 
-  do {
-    gboolean stop;
-    struct timeval timeval, *timeout;
-
-    FD_ZERO (&read_fds);
-    FD_SET (udpsrc->sock, &read_fds);
-#ifndef G_OS_WIN32
-    FD_SET (READ_SOCKET (udpsrc), &read_fds);
-#endif
-    max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
+  if (udpsrc->timeout > 0) {
+    timeout = udpsrc->timeout * GST_USECOND;
+  } else {
+    timeout = GST_CLOCK_TIME_NONE;
+  }
 
+  do {
     try_again = FALSE;
-    stop = FALSE;
 
     GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
         udpsrc->timeout);
 
-    if (udpsrc->timeout > 0) {
-      timeval.tv_sec = udpsrc->timeout / 1000000;
-      timeval.tv_usec = udpsrc->timeout % 1000000;
-      timeout = &timeval;
-    } else {
-      timeout = NULL;
-    }
-
-#ifdef G_OS_WIN32
-    if (((max_sock + 1) != READ_SOCKET (udpsrc)) ||
-        ((max_sock + 1) != WRITE_SOCKET (udpsrc))) {
-      ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
-    } else {
-      ret = 1;
-    }
-#else
-    ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
-#endif
+    ret = gst_poll_wait (udpsrc->fdset, timeout);
     GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
     if (ret < 0) {
+      if (errno == EBUSY)
+        goto stopped;
 #ifdef G_OS_WIN32
       if (WSAGetLastError () != WSAEINTR)
         goto select_error;
@@ -444,9 +404,6 @@ retry:
               gst_structure_new ("GstUDPSrcTimeout",
                   "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
       try_again = TRUE;
-    } else {
-      if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds))
-        goto stopped;
     }
   } while (try_again);
 
@@ -454,7 +411,7 @@ retry:
    * one UDP packet. We will check the return value, though, because in some
    * case it can return 0 and we don't want a 0 sized buffer. */
   readsize = 0;
-  if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
+  if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
     goto ioctl_failed;
 
   /* if we get here and there is nothing to read from the socket, the select got
@@ -472,7 +429,7 @@ no_select:
 
   while (TRUE) {
     len = sizeof (struct sockaddr);
-    ret = recvfrom (udpsrc->sock, pktdata, pktsize,
+    ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize,
         0, (struct sockaddr *) &tmpaddr, &len);
     if (ret < 0) {
       if (errno != EAGAIN && errno != EINTR)
@@ -690,7 +647,7 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
       g_value_set_boolean (value, udpsrc->closefd);
       break;
     case PROP_SOCK:
-      g_value_set_int (value, udpsrc->sock);
+      g_value_set_int (value, udpsrc->sock.fd);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -713,22 +670,6 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
 
   src = GST_UDPSRC (bsrc);
 
-#ifdef G_OS_WIN32
-  GST_DEBUG_OBJECT (src, "creating pipe");
-
-  /* This should work on UNIX too. PF_UNIX sockets replaced with pipe */
-  /* pipe( CONTROL_SOCKETS(src), 4096, _O_BINARY ) */
-  if ((ret = _pipe (CONTROL_SOCKETS (src), 4096, _O_BINARY)) < 0)
-    goto no_socket_pair;
-#else
-  GST_DEBUG_OBJECT (src, "creating socket pair");
-  if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0)
-    goto no_socket_pair;
-
-  fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
-  fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
-#endif
-
   if (!inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr)))
     src->multi_addr.imr_multiaddr.s_addr = 0;
 
@@ -737,12 +678,12 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
     if ((ret = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
       goto no_socket;
 
-    src->sock = ret;
+    src->sock.fd = ret;
     src->externalfd = FALSE;
 
     reuse = 1;
     if ((ret =
-            setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
+            setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
                 sizeof (reuse))) < 0)
       goto setsockopt_error;
 
@@ -756,25 +697,26 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
       src->myaddr.sin_addr.s_addr = INADDR_ANY;
 
     GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
-    if ((ret = bind (src->sock, (struct sockaddr *) &src->myaddr,
+    if ((ret = bind (src->sock.fd, (struct sockaddr *) &src->myaddr,
                 sizeof (src->myaddr))) < 0)
       goto bind_error;
   } else {
     /* we use the configured socket */
-    src->sock = src->sockfd;
+    src->sock.fd = src->sockfd;
     src->externalfd = TRUE;
   }
 
   if (src->multi_addr.imr_multiaddr.s_addr) {
     src->multi_addr.imr_interface.s_addr = INADDR_ANY;
     if ((ret =
-            setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+            setsockopt (src->sock.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
                 &src->multi_addr, sizeof (src->multi_addr))) < 0)
       goto membership;
   }
 
   len = sizeof (my_addr);
-  if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0)
+  if ((ret =
+          getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0)
     goto getsockname_error;
 
   len = sizeof (rcvsize);
@@ -785,7 +727,9 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
     /* set buffer size, Note that on Linux this is typically limited to a
      * maximum of around 100K. Also a minimum of 128 bytes is required on
      * Linux. */
-    ret = setsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, len);
+    ret =
+        setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
+        len);
     if (ret != 0)
       goto udpbuffer_error;
   }
@@ -793,14 +737,15 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
   /* read the value of the receive buffer. Note that on linux this returns 2x the
    * value we set because the kernel allocates extra memory for metadata.
    * The default on Linux is about 100K (which is about 50K without metadata) */
-  ret = getsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
+  ret =
+      getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
   if (ret == 0)
     GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
   else
     GST_DEBUG_OBJECT (src, "could not get udp buffer size");
 
   bc_val = 1;
-  if ((ret = setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
+  if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
               sizeof (bc_val))) < 0)
     goto no_broadcast;
 
@@ -814,15 +759,20 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
 
   src->myaddr.sin_port = htons (src->port + 1);
 
+#ifdef G_OS_WIN32
+  if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, FALSE)) == NULL)
+    goto no_fdset;
+#else
+  if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
+    goto no_fdset;
+#endif
+
+  gst_poll_add_fd (src->fdset, &src->sock);
+  gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
+
   return TRUE;
 
   /* ERRORS */
-no_socket_pair:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno));
-    return FALSE;
-  }
 no_socket:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
@@ -873,19 +823,25 @@ no_broadcast:
             g_strerror (errno), errno));
     return FALSE;
   }
+no_fdset:
+  {
+    CLOSE_IF_REQUESTED (src);
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+        ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
+            errno));
+    return FALSE;
+  }
 }
 
 static gboolean
 gst_udpsrc_unlock (GstBaseSrc * bsrc)
 {
   GstUDPSrc *src;
-  gint res;
 
   src = GST_UDPSRC (bsrc);
 
-  GST_LOG_OBJECT (src, "sending stop command");
-  SEND_COMMAND (src, CONTROL_STOP, res);
-  GST_LOG_OBJECT (src, "sent stop command %d", res);
+  GST_LOG_OBJECT (src, "Flushing");
+  gst_poll_set_flushing (src->fdset, TRUE);
 
   return TRUE;
 }
@@ -897,21 +853,8 @@ gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
 
   src = GST_UDPSRC (bsrc);
 
-  GST_LOG_OBJECT (src, "clearing unlock command queue");
-
-  while (TRUE) {
-    gchar command;
-    int res;
-
-    GST_LOG_OBJECT (src, "reading command");
-
-    READ_COMMAND (src, command, res);
-    if (res <= 0) {
-      GST_LOG_OBJECT (src, "no more commands");
-      /* no more commands */
-      break;
-    }
-  }
+  GST_LOG_OBJECT (src, "No longer flushing");
+  gst_poll_set_flushing (src->fdset, FALSE);
 
   return TRUE;
 }
@@ -925,18 +868,13 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
 
   GST_DEBUG ("stopping, closing sockets");
 
-  if (src->sock != -1) {
+  if (src->sock.fd >= 0) {
     CLOSE_IF_REQUESTED (src);
   }
 
-  /* pipes on WIN32 else sockets */
-  if (src->control_sock[0] != -1) {
-    close (src->control_sock[0]);
-    src->control_sock[0] = -1;
-  }
-  if (src->control_sock[1] != -1) {
-    close (src->control_sock[1]);
-    src->control_sock[1] = -1;
+  if (src->fdset) {
+    gst_poll_free (src->fdset);
+    src->fdset = NULL;
   }
 
   WSA_CLEANUP (src);
index c672d2a..e736a59 100644 (file)
@@ -52,21 +52,21 @@ struct _GstUDPSrc {
   GstPushSrc parent;
 
   /* properties */
-  gchar   *uri;
-  int      port;
-  gchar   *multi_group;
-  gint     ttl;
-  GstCaps *caps;
-  gint     buffer_size;
-  guint64  timeout;
-  gint     skip_first_bytes;
-  int      sockfd;
-  gboolean closefd;
+  gchar     *uri;
+  int        port;
+  gchar     *multi_group;
+  gint       ttl;
+  GstCaps   *caps;
+  gint       buffer_size;
+  guint64    timeout;
+  gint       skip_first_bytes;
+  int        sockfd;
+  gboolean   closefd;
 
   /* our sockets */
-  int      sock;
-  int      control_sock[2];
-  gboolean externalfd;
+  GstPollFD  sock;
+  GstPoll   *fdset;
+  gboolean   externalfd;
 
   struct   sockaddr_in myaddr;
   struct   ip_mreq multi_addr;