rtspsrc: rework reconnect code
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 7 Jan 2011 17:02:49 +0000 (18:02 +0100)
committerMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Tue, 17 May 2011 09:55:29 +0000 (11:55 +0200)
Use the same async code path to implement reconnects.
Make sure we only post progress messages when doing async things.

gst/rtsp/gstrtspsrc.c

index f372156..e838726 100644 (file)
@@ -3277,7 +3277,8 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source)
 }
 
 static GstRTSPResult
-gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info)
+gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info,
+    gboolean async)
 {
   GstRTSPResult res;
 
@@ -3312,8 +3313,9 @@ gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info)
 
   if (!info->connected) {
     /* connect */
-    GST_ELEMENT_PROGRESS (src, CONTINUE, "connect",
-        ("Connecting to %s", info->location));
+    if (async)
+      GST_ELEMENT_PROGRESS (src, CONTINUE, "connect",
+          ("Connecting to %s", info->location));
     GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location);
     if ((res =
             gst_rtsp_connection_connect (info->connection,
@@ -3365,13 +3367,14 @@ gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info,
 }
 
 static GstRTSPResult
-gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info)
+gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info,
+    gboolean async)
 {
   GstRTSPResult res;
 
   GST_DEBUG_OBJECT (src, "reconnecting connection...");
   gst_rtsp_conninfo_close (src, info, FALSE);
-  res = gst_rtsp_conninfo_connect (src, info);
+  res = gst_rtsp_conninfo_connect (src, info, async);
 
   return res;
 }
@@ -3768,7 +3771,6 @@ invalid_length:
 static GstFlowReturn
 gst_rtspsrc_loop_udp (GstRTSPSrc * src)
 {
-  gboolean restart = FALSE;
   GstRTSPResult res;
   GstRTSPMessage message = { 0 };
   gint retry = 0;
@@ -3809,7 +3811,6 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
         break;
       case GST_RTSP_EINTR:
         /* we got interrupted, see what we have to do */
-        GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush");
         goto interrupt;
       case GST_RTSP_ETIMEOUT:
         /* send keep-alive, ignore the result, a warning will be posted. */
@@ -3822,7 +3823,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
          * see what happens. */
         GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
             ("The server closed the connection."));
-        if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) < 0)
+        if ((res =
+                gst_rtsp_conninfo_reconnect (src, &src->conninfo, FALSE)) < 0)
           goto connect_error;
 
         continue;
@@ -3868,17 +3870,79 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
     }
   }
 
-interrupt:
   /* we get here when the connection got interrupted */
-  GST_OBJECT_LOCK (src);
-  gst_rtspsrc_connection_flush (src, FALSE);
-  GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd);
-  if (src->loop_cmd != CMD_RECONNECT)
-    goto stopping;
+interrupt:
+  {
+    gst_rtsp_message_unset (&message);
+    GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush");
+    gst_rtspsrc_connection_flush (src, FALSE);
+    return GST_FLOW_WRONG_STATE;
+  }
+connect_error:
+  {
+    gchar *str = gst_rtsp_strresult (res);
+    GstFlowReturn ret;
+
+    src->conninfo.connected = FALSE;
+    if (res != GST_RTSP_EINTR) {
+      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+          ("Could not connect to server. (%s)", str));
+      g_free (str);
+      ret = GST_FLOW_ERROR;
+    } else {
+      ret = GST_FLOW_WRONG_STATE;
+    }
+    return ret;
+  }
+receive_error:
+  {
+    gchar *str = gst_rtsp_strresult (res);
+
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Could not receive message. (%s)", str));
+    g_free (str);
+    return GST_FLOW_ERROR;
+  }
+handle_request_failed:
+  {
+    gchar *str = gst_rtsp_strresult (res);
+    GstFlowReturn ret;
+
+    gst_rtsp_message_unset (&message);
+    if (res != GST_RTSP_EINTR) {
+      GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
+          ("Could not handle server message. (%s)", str));
+      g_free (str);
+      ret = GST_FLOW_ERROR;
+    } else {
+      ret = GST_FLOW_WRONG_STATE;
+    }
+    return ret;
+  }
+server_eof:
+  {
+    GST_DEBUG_OBJECT (src, "we got an eof from the server");
+    GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
+        ("The server closed the connection."));
+    src->conninfo.connected = FALSE;
+    gst_rtsp_message_unset (&message);
+    return GST_FLOW_UNEXPECTED;
+  }
+}
 
+static GstRTSPResult
+gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async)
+{
+  GstRTSPResult res = GST_RTSP_OK;
+  gboolean restart;
+
+  GST_DEBUG_OBJECT (src, "doing reconnect");
+
+  GST_OBJECT_LOCK (src);
   /* only restart when the pads were not yet activated, else we were
    * streaming over UDP */
   restart = src->need_activate;
+  src->flushing = FALSE;
   GST_OBJECT_UNLOCK (src);
 
   /* no need to restart, we're done */
@@ -3889,10 +3953,12 @@ interrupt:
   src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
 
   /* pause to prepare for a restart */
-  gst_rtspsrc_pause (src, FALSE, FALSE);
+  if ((res = gst_rtspsrc_pause (src, FALSE, async)) < 0)
+    goto done;
 
   /* close and cleanup our state */
-  gst_rtspsrc_close (src, FALSE);
+  if ((res = gst_rtspsrc_close (src, async)) < 0)
+    goto done;
 
   /* see if we have TCP left to try. Also don't try TCP when we were configured
    * with an SDP. */
@@ -3907,52 +3973,17 @@ interrupt:
           gst_guint64_to_gdouble (src->udp_timeout / 1000000.0)));
 
   /* open new connection using tcp */
-  if (!gst_rtspsrc_open (src, FALSE))
+  if (gst_rtspsrc_open (src, async) < 0)
     goto open_failed;
 
   /* start playback */
-  if (!gst_rtspsrc_play (src, &src->segment, FALSE))
+  if (gst_rtspsrc_play (src, &src->segment, async) < 0)
     goto play_failed;
 
 done:
-  return GST_FLOW_OK;
+  return res;
 
   /* ERRORS */
-stopping:
-  {
-    GST_DEBUG_OBJECT (src, "we are stopping");
-    GST_OBJECT_UNLOCK (src);
-    return GST_FLOW_WRONG_STATE;
-  }
-receive_error:
-  {
-    gchar *str = gst_rtsp_strresult (res);
-
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("Could not receive message. (%s)", str));
-    g_free (str);
-    return GST_FLOW_ERROR;
-  }
-handle_request_failed:
-  {
-    gchar *str = gst_rtsp_strresult (res);
-
-    GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
-        ("Could not handle server message. (%s)", str));
-    g_free (str);
-    gst_rtsp_message_unset (&message);
-    return GST_FLOW_ERROR;
-  }
-connect_error:
-  {
-    gchar *str = gst_rtsp_strresult (res);
-
-    src->conninfo.connected = FALSE;
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        ("Could not connect to server. (%s)", str));
-    g_free (str);
-    return GST_FLOW_ERROR;
-  }
 no_protocols:
   {
     src->cur_protocols = 0;
@@ -3973,15 +4004,6 @@ play_failed:
     GST_DEBUG_OBJECT (src, "play failed");
     return GST_FLOW_OK;
   }
-server_eof:
-  {
-    GST_DEBUG_OBJECT (src, "we got an eof from the server");
-    GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
-        ("The server closed the connection."));
-    src->conninfo.connected = FALSE;
-    gst_rtsp_message_unset (&message);
-    return GST_FLOW_UNEXPECTED;
-  }
 }
 
 static void
@@ -4535,7 +4557,9 @@ receive_error:
         if (try == 0) {
           try++;
           /* if reconnect succeeds, try again */
-          if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) == 0)
+          if ((res =
+                  gst_rtsp_conninfo_reconnect (src, &src->conninfo,
+                      FALSE)) == 0)
             goto again;
         }
         /* only try once after reconnect, then fallthrough and error out */
@@ -5068,7 +5092,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
     }
 
     if (src->conninfo.connection == NULL) {
-      if (!gst_rtsp_conninfo_connect (src, &stream->conninfo)) {
+      if (!gst_rtsp_conninfo_connect (src, &stream->conninfo, async)) {
         GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream);
         continue;
       }
@@ -5469,7 +5493,7 @@ gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp,
       src->conninfo.location = g_strdup (control);
       /* make a connection for this, if there was a connection already, nothing
        * happens. */
-      if (gst_rtsp_conninfo_connect (src, &src->conninfo) < 0) {
+      if (gst_rtsp_conninfo_connect (src, &src->conninfo, async) < 0) {
         GST_ERROR_OBJECT (src, "could not connect");
       }
     }
@@ -5527,7 +5551,7 @@ restart:
     goto no_url;
   src->tried_url_auth = FALSE;
 
-  if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo)) < 0)
+  if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo, async)) < 0)
     goto connect_failed;
 
   /* create OPTIONS */
@@ -6381,6 +6405,11 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
     case CMD_LOOP:
       running = gst_rtspsrc_loop (src);
       break;
+    case CMD_RECONNECT:
+      ret = gst_rtspsrc_reconnect (src, FALSE);
+      if (ret == GST_RTSP_OK)
+        running = TRUE;
+      break;
     default:
       break;
   }