gst/rtsp/gstrtspsrc.*: Add TCP timeout property and use it for all TCP connection.
authorWim Taymans <wim.taymans@gmail.com>
Fri, 18 May 2007 11:39:12 +0000 (11:39 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Fri, 18 May 2007 11:39:12 +0000 (11:39 +0000)
Original commit message from CVS:
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
(gst_rtspsrc_set_property), (gst_rtspsrc_get_property),
(gst_rtspsrc_stream_configure_udp), (gst_rtspsrc_loop_interleaved),
(gst_rtspsrc_loop_udp), (gst_rtspsrc_try_send), (gst_rtspsrc_send),
(gst_rtspsrc_setup_streams), (gst_rtspsrc_open):
* gst/rtsp/gstrtspsrc.h:
Add TCP timeout property and use it for all TCP connection.
* gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
(rtsp_connection_write), (rtsp_connection_next_timeout),
(rtsp_connection_reset_timeout):
Make connect and writes cancelable and make them use the timeout.

ChangeLog
gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h
gst/rtsp/rtspconnection.c

index 09924a5..46a356d 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,20 @@
 2007-05-18  Wim Taymans  <wim@fluendo.com>
 
+       * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
+       (gst_rtspsrc_set_property), (gst_rtspsrc_get_property),
+       (gst_rtspsrc_stream_configure_udp), (gst_rtspsrc_loop_interleaved),
+       (gst_rtspsrc_loop_udp), (gst_rtspsrc_try_send), (gst_rtspsrc_send),
+       (gst_rtspsrc_setup_streams), (gst_rtspsrc_open):
+       * gst/rtsp/gstrtspsrc.h:
+       Add TCP timeout property and use it for all TCP connection.
+
+       * gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
+       (rtsp_connection_write), (rtsp_connection_next_timeout),
+       (rtsp_connection_reset_timeout):
+       Make connect and writes cancelable and make them use the timeout.
+
+2007-05-18  Wim Taymans  <wim@fluendo.com>
+
        * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_send_keep_alive),
        (gst_rtspsrc_loop_interleaved), (gst_rtspsrc_loop_udp),
        (gst_rtspsrc_try_send), (gst_rtspsrc_send),
index d7743a9..8ce75ec 100644 (file)
@@ -145,6 +145,7 @@ enum
 #define DEFAULT_DEBUG           FALSE
 #define DEFAULT_RETRY           20
 #define DEFAULT_TIMEOUT         5000000
+#define DEFAULT_TCP_TIMEOUT     20000000
 #define DEFAULT_LATENCY_MS      3000
 
 enum
@@ -155,6 +156,7 @@ enum
   PROP_DEBUG,
   PROP_RETRY,
   PROP_TIMEOUT,
+  PROP_TCP_TIMEOUT,
   PROP_LATENCY,
 };
 
@@ -279,10 +281,16 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
       g_param_spec_uint64 ("timeout", "Timeout",
-          "Retry TCP transport after timeout microseconds (0 = disabled)",
+          "Retry TCP transport after UDP timeout microseconds (0 = disabled)",
           0, G_MAXUINT64, DEFAULT_TIMEOUT,
           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 
+  g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT,
+      g_param_spec_uint64 ("tcp-timeout", "TCP Timeout",
+          "Fail after timeout microseconds on TCP connections (0 = disabled)",
+          0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+
   g_object_class_install_property (gobject_class, PROP_LATENCY,
       g_param_spec_uint ("latency", "Buffer latency in ms",
           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
@@ -365,8 +373,16 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
       rtspsrc->retry = g_value_get_uint (value);
       break;
     case PROP_TIMEOUT:
-      rtspsrc->timeout = g_value_get_uint64 (value);
+      rtspsrc->udp_timeout = g_value_get_uint64 (value);
       break;
+    case PROP_TCP_TIMEOUT:
+    {
+      guint64 timeout = g_value_get_uint64 (value);
+
+      rtspsrc->tcp_timeout.tv_sec = timeout / G_USEC_PER_SEC;
+      rtspsrc->tcp_timeout.tv_usec = timeout % G_USEC_PER_SEC;
+      break;
+    }
     case PROP_LATENCY:
       rtspsrc->latency = g_value_get_uint (value);
       break;
@@ -398,8 +414,17 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
       g_value_set_uint (value, rtspsrc->retry);
       break;
     case PROP_TIMEOUT:
-      g_value_set_uint64 (value, rtspsrc->timeout);
+      g_value_set_uint64 (value, rtspsrc->udp_timeout);
+      break;
+    case PROP_TCP_TIMEOUT:
+    {
+      guint64 timeout;
+
+      timeout = rtspsrc->tcp_timeout.tv_sec * G_USEC_PER_SEC +
+          rtspsrc->tcp_timeout.tv_usec;
+      g_value_set_uint64 (value, timeout);
       break;
+    }
     case PROP_LATENCY:
       g_value_set_uint (value, rtspsrc->latency);
       break;
@@ -1688,7 +1713,8 @@ gst_rtspsrc_stream_configure_udp (GstRTSPSrc * src, GstRTSPStream * stream,
     /* configure a timeout on the UDP port. When the timeout message is
      * posted, we assume UDP transport is not possible. We reconnect using TCP
      * if we can. */
-    g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", src->timeout, NULL);
+    g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", src->udp_timeout,
+        NULL);
 
     /* get output pad of the UDP source. */
     *outpad = gst_element_get_pad (stream->udpsrc[0], "src");
@@ -2137,7 +2163,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
 
   have_data = FALSE;
   do {
-    GTimeVal tv_timeout;
+    GTimeVal tv_timeout, *tv;
 
     /* get the next timeout interval */
     rtsp_connection_next_timeout (src->connection, &tv_timeout);
@@ -2149,9 +2175,14 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
       res = gst_rtspsrc_send_keep_alive (src);
     }
 
+    if ((src->tcp_timeout.tv_sec | src->tcp_timeout.tv_usec))
+      tv = &src->tcp_timeout;
+    else
+      tv = NULL;
+
     GST_DEBUG_OBJECT (src, "doing receive");
 
-    res = rtsp_connection_receive (src->connection, &message, NULL);
+    res = rtsp_connection_receive (src->connection, &message, tv);
 
     switch (res) {
       case RTSP_OK:
@@ -2425,7 +2456,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
   GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
       ("Could not receive any UDP packets for %.4f seconds, maybe your "
           "firewall is blocking it. Retrying using a TCP connection.",
-          gst_guint64_to_gdouble (src->timeout / 1000000)));
+          gst_guint64_to_gdouble (src->udp_timeout / 1000000)));
   /* we can try only TCP now */
   src->cur_protocols = RTSP_LOWER_TRANS_TCP;
 
@@ -2685,6 +2716,7 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
   RTSPResult res;
   RTSPStatusCode thecode;
   gchar *content_base = NULL;
+  GTimeVal *tv;
 
   if (src->extension && src->extension->before_send)
     src->extension->before_send (src->extension, request);
@@ -2694,13 +2726,18 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
   if (src->debug)
     rtsp_message_dump (request);
 
-  if ((res = rtsp_connection_send (src->connection, request, NULL)) < 0)
+  if ((src->tcp_timeout.tv_sec | src->tcp_timeout.tv_usec))
+    tv = &src->tcp_timeout;
+  else
+    tv = NULL;
+
+  if ((res = rtsp_connection_send (src->connection, request, tv)) < 0)
     goto send_error;
 
   rtsp_connection_reset_timeout (src->connection);
 
 next:
-  if ((res = rtsp_connection_receive (src->connection, response, NULL)) < 0)
+  if ((res = rtsp_connection_receive (src->connection, response, tv)) < 0)
     goto receive_error;
 
   if (src->debug)
@@ -3338,7 +3375,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
 
   /* connect */
   GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location);
-  if ((res = rtsp_connection_connect (src->connection, NULL)) < 0)
+  if ((res = rtsp_connection_connect (src->connection, &src->tcp_timeout)) < 0)
     goto could_not_connect;
 
   /* create OPTIONS */
index 3bada06..caf9efe 100644 (file)
@@ -147,7 +147,8 @@ struct _GstRTSPSrc {
   RTSPLowerTrans   protocols;
   gboolean         debug;
   guint           retry;
-  guint64          timeout;
+  guint64          udp_timeout;
+  GTimeVal         tcp_timeout;
   guint            latency;
 
   /* state */
index cb81c72..086dddc 100644 (file)
@@ -172,6 +172,10 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout)
   gint ret;
   guint16 port;
   RTSPUrl *url;
+  fd_set writefds;
+  fd_set readfds;
+  struct timeval tv, *tvp;
+  gint max_fd, retval;
 
   g_return_val_if_fail (conn != NULL, RTSP_EINVAL);
   g_return_val_if_fail (conn->url != NULL, RTSP_EINVAL);
@@ -207,12 +211,42 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout)
     goto sys_error;
 
   /* set to non-blocking mode so that we can cancel the connect */
-  //fcntl (fd, F_SETFL, O_NONBLOCK);
+  fcntl (fd, F_SETFL, O_NONBLOCK);
 
+  /* we are going to connect ASYNC now */
   ret = connect (fd, (struct sockaddr *) &sin, sizeof (sin));
-  if (ret != 0)
+  if (ret == 0)
+    goto done;
+  if (errno != EINPROGRESS)
     goto sys_error;
 
+  /* wait for connect to complete up to the specified timeout or until we got
+   * interrupted. */
+  FD_ZERO (&writefds);
+  FD_SET (fd, &writefds);
+  FD_ZERO (&readfds);
+  FD_SET (READ_SOCKET (conn), &readfds);
+
+  if (timeout->tv_sec != 0 || timeout->tv_usec != 0) {
+    tv.tv_sec = timeout->tv_sec;
+    tv.tv_usec = timeout->tv_usec;
+    tvp = &tv;
+  } else {
+    tvp = NULL;
+  }
+
+  max_fd = MAX (fd, READ_SOCKET (conn));
+
+  do {
+    retval = select (max_fd + 1, &readfds, &writefds, NULL, tvp);
+  } while ((retval == -1 && errno == EINTR));
+
+  if (retval == 0)
+    goto timeout;
+  else if (retval == -1)
+    goto sys_error;
+
+done:
   conn->fd = fd;
   conn->ip = ip;
 
@@ -232,6 +266,10 @@ not_ip:
   {
     return RTSP_ENOTIP;
   }
+timeout:
+  {
+    return RTSP_ETIMEOUT;
+  }
 }
 
 static void
@@ -270,15 +308,61 @@ rtsp_connection_write (RTSPConnection * conn, const guint8 * data, guint size,
     GTimeVal * timeout)
 {
   guint towrite;
+  fd_set writefds;
+  fd_set readfds;
+  int max_fd;
+  gint retval;
+  struct timeval tv, *tvp;
 
   g_return_val_if_fail (conn != NULL, RTSP_EINVAL);
   g_return_val_if_fail (data != NULL || size == 0, RTSP_EINVAL);
 
+  FD_ZERO (&writefds);
+  FD_SET (conn->fd, &writefds);
+  FD_ZERO (&readfds);
+  FD_SET (READ_SOCKET (conn), &readfds);
+
+  max_fd = MAX (conn->fd, READ_SOCKET (conn));
+
+  if (timeout) {
+    tv.tv_sec = timeout->tv_sec;
+    tv.tv_usec = timeout->tv_usec;
+    tvp = &tv;
+  } else {
+    tvp = NULL;
+  }
+
   towrite = size;
 
   while (towrite > 0) {
     gint written;
 
+    do {
+      retval = select (max_fd + 1, &readfds, &writefds, NULL, tvp);
+    } while ((retval == -1 && errno == EINTR));
+
+    if (retval == 0)
+      goto timeout;
+
+    if (retval == -1)
+      goto select_error;
+
+    if (FD_ISSET (READ_SOCKET (conn), &readfds)) {
+      /* read all stop commands */
+      while (TRUE) {
+        gchar command;
+        int res;
+
+        READ_COMMAND (conn, command, res);
+        if (res <= 0) {
+          /* no more commands */
+          break;
+        }
+      }
+      goto stopped;
+    }
+
+    /* now we can write */
     written = write (conn->fd, data, towrite);
     if (written < 0) {
       if (errno != EAGAIN && errno != EINTR)
@@ -291,6 +375,18 @@ rtsp_connection_write (RTSPConnection * conn, const guint8 * data, guint size,
   return RTSP_OK;
 
   /* ERRORS */
+timeout:
+  {
+    return RTSP_ETIMEOUT;
+  }
+select_error:
+  {
+    return RTSP_ESYS;
+  }
+stopped:
+  {
+    return RTSP_EINTR;
+  }
 write_error:
   {
     return RTSP_ESYS;