rtspsrc: implement more async handling
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 7 Jan 2011 12:43:06 +0000 (13:43 +0100)
committerMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Tue, 17 May 2011 09:55:20 +0000 (11:55 +0200)
Remove some old locks.
Make sure we never go into the loop function when flushing.

gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h

index 404a3621527b92d37adc472797186cea981d2ca9..b2c1c42bbb62fc916924f58e5bec6bfe90f8fce6 100644 (file)
@@ -270,8 +270,7 @@ static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event,
 #define CMD_CLOSE      3
 #define CMD_WAIT       4
 #define CMD_RECONNECT  5
-#define CMD_STOP       6
-#define CMD_LOOP       7
+#define CMD_LOOP       6
 
 #define GST_ELEMENT_PROGRESS(el, type, code, text)      \
 G_STMT_START {                                          \
@@ -527,10 +526,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class)
   src->state_rec_lock = g_new (GStaticRecMutex, 1);
   g_static_rec_mutex_init (src->state_rec_lock);
 
-  /* protects access to the server connection */
-  src->conn_rec_lock = g_new (GStaticRecMutex, 1);
-  g_static_rec_mutex_init (src->conn_rec_lock);
-
   src->state = GST_RTSP_STATE_INVALID;
 }
 
@@ -558,8 +553,6 @@ gst_rtspsrc_finalize (GObject * object)
   g_free (rtspsrc->stream_rec_lock);
   g_static_rec_mutex_free (rtspsrc->state_rec_lock);
   g_free (rtspsrc->state_rec_lock);
-  g_static_rec_mutex_free (rtspsrc->conn_rec_lock);
-  g_free (rtspsrc->conn_rec_lock);
 
 #ifdef G_OS_WIN32
   WSACleanup ();
@@ -1668,7 +1661,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush)
   if (flush) {
     event = gst_event_new_flush_start ();
     GST_DEBUG_OBJECT (src, "start flush");
-    cmd = CMD_STOP;
+    cmd = CMD_WAIT;
     state = GST_STATE_PAUSED;
   } else {
     event = gst_event_new_flush_stop ();
@@ -1718,12 +1711,10 @@ gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn,
 {
   GstRTSPResult ret;
 
-  GST_RTSP_CONN_LOCK (src);
   if (conn)
     ret = gst_rtsp_connection_send (conn, message, timeout);
   else
     ret = GST_RTSP_ERROR;
-  GST_RTSP_CONN_UNLOCK (src);
 
   return ret;
 }
@@ -1734,12 +1725,10 @@ gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn,
 {
   GstRTSPResult ret;
 
-  GST_RTSP_CONN_LOCK (src);
   if (conn)
     ret = gst_rtsp_connection_receive (conn, message, timeout);
   else
     ret = GST_RTSP_ERROR;
-  GST_RTSP_CONN_UNLOCK (src);
 
   return ret;
 }
@@ -3537,8 +3526,10 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
     /* see if the timeout period expired */
     if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) {
       GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
-      /* send keep-alive, ignore the result, a warning will be posted. */
-      gst_rtspsrc_send_keep_alive (src);
+      /* send keep-alive, only act on interrupt, a warning will be posted for
+       * other errors. */
+      if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
+        goto interrupt;
       /* get new timeout */
       gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
     }
@@ -3546,11 +3537,23 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
     GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
         tv_timeout.tv_sec, tv_timeout.tv_usec);
 
-    /* protect the connection with the connection lock so that we can see when
-     * we are finished doing server communication */
-    res =
-        gst_rtspsrc_connection_receive (src, src->conninfo.connection, &message,
-        src->ptcp_timeout);
+    GST_OBJECT_LOCK (src);
+    if (src->loop_cmd == CMD_LOOP && !src->flushing) {
+      src->waiting = TRUE;
+      GST_OBJECT_UNLOCK (src);
+
+      /* protect the connection with the connection lock so that we can see when
+       * we are finished doing server communication */
+      res =
+          gst_rtspsrc_connection_receive (src, src->conninfo.connection,
+          &message, src->ptcp_timeout);
+
+      GST_OBJECT_LOCK (src);
+      src->waiting = FALSE;
+    } else {
+      res = GST_RTSP_EINTR;
+    }
+    GST_OBJECT_UNLOCK (src);
 
     switch (res) {
       case GST_RTSP_OK:
@@ -3562,7 +3565,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
       case GST_RTSP_ETIMEOUT:
         /* no reply, send keep alive */
         GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
-        gst_rtspsrc_send_keep_alive (src);
+        if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
+          goto interrupt;
         continue;
       case GST_RTSP_EEOF:
         /* go EOS when the server closed the connection */
@@ -3779,12 +3783,25 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
         (gint) tv_timeout.tv_sec);
 
     gst_rtsp_message_unset (&message);
-    /* we should continue reading the TCP socket because the server might
-     * send us requests. When the session timeout expires, we need to send a
-     * keep-alive request to keep the session open. */
-    res =
-        gst_rtspsrc_connection_receive (src, src->conninfo.connection,
-        &message, &tv_timeout);
+
+    GST_OBJECT_LOCK (src);
+    if (src->loop_cmd == CMD_LOOP && !src->flushing) {
+      src->waiting = TRUE;
+      GST_OBJECT_UNLOCK (src);
+
+      /* we should continue reading the TCP socket because the server might
+       * send us requests. When the session timeout expires, we need to send a
+       * keep-alive request to keep the session open. */
+      res =
+          gst_rtspsrc_connection_receive (src, src->conninfo.connection,
+          &message, &tv_timeout);
+
+      GST_OBJECT_LOCK (src);
+      src->waiting = FALSE;
+    } else {
+      res = GST_RTSP_EINTR;
+    }
+    GST_OBJECT_UNLOCK (src);
 
     switch (res) {
       case GST_RTSP_OK:
@@ -3797,7 +3814,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
       case GST_RTSP_ETIMEOUT:
         /* send keep-alive, ignore the result, a warning will be posted. */
         GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
-        gst_rtspsrc_send_keep_alive (src);
+        if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
+          goto interrupt;
         continue;
       case GST_RTSP_EEOF:
         /* server closed the connection. not very fatal for UDP, reconnect and
@@ -3832,7 +3850,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
           GST_DEBUG_OBJECT (src, "but is Unauthorized response ...");
           if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) {
             GST_DEBUG_OBJECT (src, "so retrying keep-alive");
-            gst_rtspsrc_send_keep_alive (src);
+            if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
+              goto interrupt;
           }
         } else {
           retry = 0;
@@ -3857,7 +3876,6 @@ interrupt:
   if (src->loop_cmd != CMD_RECONNECT)
     goto stopping;
 
-
   /* when we get here we have to reconnect using tcp */
   src->loop_cmd = CMD_LOOP;
 
@@ -3974,12 +3992,14 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush)
 {
   GST_OBJECT_LOCK (src);
   src->loop_cmd = cmd;
+  src->flushing = flush;
   if (flush) {
-    GST_DEBUG_OBJECT (src, "start connection flush");
-    gst_rtspsrc_connection_flush (src, TRUE);
+    if (src->waiting) {
+      GST_DEBUG_OBJECT (src, "start connection flush");
+      gst_rtspsrc_connection_flush (src, TRUE);
+    }
   } else {
     GST_DEBUG_OBJECT (src, "stop connection flush");
-    gst_rtspsrc_connection_flush (src, FALSE);
   }
   if (src->task)
     gst_task_start (src->task);
@@ -5862,13 +5882,6 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async)
   if (!src->conninfo.connection || !src->conninfo.connected)
     goto done;
 
-  /* waiting for connection idle, we were flushing so any attempt at doing data
-   * transfer will result in pausing the tasks. */
-  GST_DEBUG_OBJECT (src, "wait for connection idle");
-  GST_RTSP_CONN_LOCK (src);
-  GST_DEBUG_OBJECT (src, "connection is idle now");
-  GST_RTSP_CONN_UNLOCK (src);
-
   /* send some dummy packets before we activate the receive in the
    * udp sources */
   gst_rtspsrc_send_dummy_packets (src);
@@ -6072,13 +6085,6 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async)
   if (src->state == GST_RTSP_STATE_READY)
     goto was_paused;
 
-  /* waiting for connection idle, we were flushing so any attempt at doing data
-   * transfer will result in pausing the tasks. */
-  GST_DEBUG_OBJECT (src, "wait for connection idle");
-  GST_RTSP_CONN_LOCK (src);
-  GST_DEBUG_OBJECT (src, "connection is idle now");
-  GST_RTSP_CONN_UNLOCK (src);
-
   if (!src->conninfo.connection || !src->conninfo.connected)
     goto no_connection;
 
@@ -6272,7 +6278,8 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
 
   GST_OBJECT_LOCK (src);
   cmd = src->loop_cmd;
-  src->loop_cmd = CMD_WAIT;
+  if (cmd != CMD_LOOP || src->flushing)
+    src->loop_cmd = CMD_WAIT;
   GST_DEBUG_OBJECT (src, "got command %d", cmd);
   GST_OBJECT_UNLOCK (src);
 
@@ -6301,8 +6308,12 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
 
   GST_OBJECT_LOCK (src);
   /* and go back to sleep */
-  if (!running && src->loop_cmd == CMD_WAIT && src->task)
-    gst_task_pause (src->task);
+  if (src->loop_cmd == CMD_WAIT) {
+    if (running)
+      src->loop_cmd = CMD_LOOP;
+    else if (src->task)
+      gst_task_pause (src->task);
+  }
   GST_OBJECT_UNLOCK (src);
 }
 
index c09442bc24e73c0f60939628b4c777060ca5e977..b5744248cc3337f15ffd564165ae9a80b5ac6b61 100644 (file)
@@ -79,10 +79,6 @@ typedef struct _GstRTSPSrcClass GstRTSPSrcClass;
 #define GST_RTSP_STREAM_LOCK(rtsp)       (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
 #define GST_RTSP_STREAM_UNLOCK(rtsp)     (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
 
-#define GST_RTSP_CONN_GET_LOCK(rtsp)     (GST_RTSPSRC_CAST(rtsp)->conn_rec_lock)
-#define GST_RTSP_CONN_LOCK(rtsp)         (g_static_rec_mutex_lock (GST_RTSP_CONN_GET_LOCK(rtsp)))
-#define GST_RTSP_CONN_UNLOCK(rtsp)       (g_static_rec_mutex_unlock (GST_RTSP_CONN_GET_LOCK(rtsp)))
-
 typedef struct _GstRTSPConnInfo GstRTSPConnInfo;
 
 struct _GstRTSPConnInfo {
@@ -184,13 +180,12 @@ struct _GstRTSPSrc {
   /* UDP mode loop */
   gint             loop_cmd;
   gboolean         ignore_timeout;
+  gboolean         flushing;
+  gboolean         waiting;
 
   /* mutex for protecting state changes */
   GStaticRecMutex *state_rec_lock;
 
-  /* mutex for protecting the connection */
-  GStaticRecMutex *conn_rec_lock;
-
   GstSDPMessage   *sdp;
   gboolean         from_sdp;
   gint             numstreams;