gst/rtsp/gstrtspsrc.*: Fix race when multiple udp sources post timeouts, just act...
authorWim Taymans <wim.taymans@gmail.com>
Wed, 2 May 2007 19:32:58 +0000 (19:32 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 2 May 2007 19:32:58 +0000 (19:32 +0000)
Original commit message from CVS:
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_init),
(gst_rtspsrc_finalize), (new_session_pad), (request_pt_map),
(gst_rtspsrc_loop_send_cmd), (gst_rtspsrc_try_send),
(gst_rtspsrc_send), (gst_rtspsrc_async_open), (gst_rtspsrc_close),
(gst_rtspsrc_play), (gst_rtspsrc_handle_message),
(gst_rtspsrc_change_state):
* gst/rtsp/gstrtspsrc.h:
Fix race when multiple udp sources post timeouts, just act on the first
received timeout.
Protect stream list with a recursive lock to fix some races.
Flush connection when we need to do a reconnect or stop.
Make state lock recursive.
* gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
(rtsp_connection_close):
Some small cleanups.

ChangeLog
gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h
gst/rtsp/rtspconnection.c

index 157166f..8ad67ef 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,24 @@
 2007-05-02  Wim Taymans  <wim@fluendo.com>
 
+       * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_init),
+       (gst_rtspsrc_finalize), (new_session_pad), (request_pt_map),
+       (gst_rtspsrc_loop_send_cmd), (gst_rtspsrc_try_send),
+       (gst_rtspsrc_send), (gst_rtspsrc_async_open), (gst_rtspsrc_close),
+       (gst_rtspsrc_play), (gst_rtspsrc_handle_message),
+       (gst_rtspsrc_change_state):
+       * gst/rtsp/gstrtspsrc.h:
+       Fix race when multiple udp sources post timeouts, just act on the first
+       received timeout.
+       Protect stream list with a recursive lock to fix some races.
+       Flush connection when we need to do a reconnect or stop.
+       Make state lock recursive.
+
+       * gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
+       (rtsp_connection_close):
+       Some small cleanups.
+
+2007-05-02  Wim Taymans  <wim@fluendo.com>
+
        * gst/wavparse/gstwavparse.c: (gst_wavparse_perform_seek),
        (gst_wavparse_stream_headers), (gst_wavparse_stream_data):
        Only set DISCONT when there actually is a discont or when we just
index a99a102..65cb3c9 100644 (file)
@@ -179,12 +179,6 @@ static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element,
     GstStateChange transition);
 static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message);
 
-static gboolean gst_rtspsrc_setup_auth (GstRTSPSrc * src,
-    RTSPMessage * response);
-static gboolean gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
-    RTSPMessage * response, RTSPStatusCode * code);
-
-
 static gboolean gst_rtspsrc_open (GstRTSPSrc * src);
 static gboolean gst_rtspsrc_play (GstRTSPSrc * src);
 static gboolean gst_rtspsrc_pause (GstRTSPSrc * src);
@@ -301,7 +295,8 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class)
 #endif
   src->extension->src = (gpointer) src;
 
-  src->state_lock = g_mutex_new ();
+  src->state_rec_lock = g_new (GStaticRecMutex, 1);
+  g_static_rec_mutex_init (src->state_rec_lock);
   src->state = RTSP_STATE_INVALID;
 }
 
@@ -319,7 +314,8 @@ gst_rtspsrc_finalize (GObject * object)
   g_free (rtspsrc->content_base);
   rtsp_url_free (rtspsrc->url);
   g_free (rtspsrc->addr);
-  g_mutex_free (rtspsrc->state_lock);
+  g_static_rec_mutex_free (rtspsrc->state_rec_lock);
+  g_free (rtspsrc->state_rec_lock);
 
   if (rtspsrc->extension) {
 #ifdef WITH_EXT_REAL
@@ -1050,9 +1046,11 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src)
   gint id, ssrc, pt;
   GList *lstream;
   GstRTSPStream *stream;
+  gboolean all_added;
 
   GST_DEBUG_OBJECT (src, "got new session pad %" GST_PTR_FORMAT, pad);
 
+  GST_RTSP_STATE_LOCK (src);
   /* find stream */
   name = gst_object_get_name (GST_OBJECT_CAST (pad));
   if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3)
@@ -1079,23 +1077,30 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src)
   gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad);
 
   /* check if we added all streams */
+  all_added = TRUE;
   for (lstream = src->streams; lstream; lstream = g_list_next (lstream)) {
     stream = (GstRTSPStream *) lstream->data;
-    if (!stream->added)
-      goto done;
+    if (!stream->added) {
+      all_added = FALSE;
+      break;
+    }
+  }
+  GST_RTSP_STATE_UNLOCK (src);
+
+  if (all_added) {
+    GST_DEBUG_OBJECT (src, "We added all streams");
+    /* when we get here, all stream are added and we can fire the no-more-pads
+     * signal. */
+    gst_element_no_more_pads (GST_ELEMENT_CAST (src));
   }
-  GST_DEBUG_OBJECT (src, "We added all streams");
-  /* when we get here, all stream are added and we can fire the no-more-pads
-   * signal. */
-  gst_element_no_more_pads (GST_ELEMENT_CAST (src));
 
-done:
   return;
 
   /* ERRORS */
 unknown_stream:
   {
     GST_DEBUG_OBJECT (src, "ignoring unknown stream");
+    GST_RTSP_STATE_UNLOCK (src);
     g_free (name);
     return;
   }
@@ -1106,21 +1111,26 @@ request_pt_map (GstElement * sess, guint session, guint pt, GstRTSPSrc * src)
 {
   GstRTSPStream *stream;
   GList *lstream;
+  GstCaps *caps;
 
   GST_DEBUG_OBJECT (src, "getting pt map for pt %d in session %d", pt, session);
 
+  GST_RTSP_STATE_LOCK (src);
   lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (session),
       (GCompareFunc) find_stream_by_id);
   if (!lstream)
     goto unknown_stream;
 
   stream = (GstRTSPStream *) lstream->data;
+  caps = stream->caps;
+  GST_RTSP_STATE_UNLOCK (src);
 
-  return stream->caps;
+  return caps;
 
 unknown_stream:
   {
     GST_DEBUG_OBJECT (src, "unknown stream %d", session);
+    GST_RTSP_STATE_UNLOCK (src);
     return NULL;
   }
 }
@@ -1852,7 +1862,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
           break;
         case RTSP_EINTR:
           /* we got interrupted, see what we have to do */
-          GST_DEBUG_OBJECT (src, "we got interrupted");
+          GST_DEBUG_OBJECT (src, "we got interrupted, unset flushing");
           /* unset flushing so we can do something else */
           rtsp_connection_flush (src->connection, FALSE);
           goto interrupt;
@@ -1992,12 +2002,14 @@ play_failed:
 }
 
 static void
-gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd)
+gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush)
 {
   GST_OBJECT_LOCK (src);
   src->loop_cmd = cmd;
-  if (cmd != CMD_WAIT)
+  if (flush) {
+    GST_DEBUG_OBJECT (src, "start flush");
     rtsp_connection_flush (src->connection, TRUE);
+  }
   GST_OBJECT_UNLOCK (src);
 }
 
@@ -2153,76 +2165,6 @@ no_user_pass:
   }
 }
 
-/**
- * gst_rtspsrc_send:
- * @src: the rtsp source
- * @request: must point to a valid request
- * @response: must point to an empty #RTSPMessage
- *
- * send @request and retrieve the response in @response. optionally @code can be
- * non-NULL in which case it will contain the status code of the response.
- *
- * If This function returns TRUE, @response will contain a valid response
- * message that should be cleaned with rtsp_message_unset() after usage. 
- *
- * If @code is NULL, this function will return FALSE (with an invalid @response
- * message) if the response code was not 200 (OK).
- *
- * If the attempt results in an authentication failure, then this will attempt
- * to retrieve authentication credentials via gst_rtspsrc_setup_auth and retry
- * the request.
- *
- * Returns: TRUE if the processing was successful.
- */
-gboolean
-gst_rtspsrc_send (GstRTSPSrc * src, RTSPMessage * request,
-    RTSPMessage * response, RTSPStatusCode * code)
-{
-  RTSPStatusCode int_code = RTSP_STS_OK;
-  gboolean res;
-  gboolean retry;
-
-  do {
-    retry = FALSE;
-    res = gst_rtspsrc_try_send (src, request, response, &int_code);
-
-    if (int_code == RTSP_STS_UNAUTHORIZED) {
-      if (gst_rtspsrc_setup_auth (src, response)) {
-        /* Try the request/response again after configuring the auth info
-         * and loop again */
-        retry = TRUE;
-      }
-    }
-  } while (retry == TRUE);
-
-  /* If the user requested the code, let them handle errors, otherwise
-   * post an error below */
-  if (code != NULL)
-    *code = int_code;
-  else if (int_code != RTSP_STS_OK)
-    goto error_response;
-
-  return res;
-
-error_response:
-  {
-    switch (response->type_data.response.code) {
-      case RTSP_STS_NOT_FOUND:
-        GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), ("%s",
-                response->type_data.response.reason));
-        break;
-      default:
-        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-            ("Got error response: %d (%s).", response->type_data.response.code,
-                response->type_data.response.reason));
-        break;
-    }
-    /* we return FALSE so we should unset the response ourselves */
-    rtsp_message_unset (response);
-    return FALSE;
-  }
-}
-
 static gboolean
 gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
     RTSPMessage * response, RTSPStatusCode * code)
@@ -2234,6 +2176,8 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
   if (src->extension && src->extension->before_send)
     src->extension->before_send (src->extension, request);
 
+  GST_DEBUG_OBJECT (src, "sending message");
+
   if (src->debug)
     rtsp_message_dump (request);
 
@@ -2309,6 +2253,76 @@ handle_request_failed:
   }
 }
 
+/**
+ * gst_rtspsrc_send:
+ * @src: the rtsp source
+ * @request: must point to a valid request
+ * @response: must point to an empty #RTSPMessage
+ *
+ * send @request and retrieve the response in @response. optionally @code can be
+ * non-NULL in which case it will contain the status code of the response.
+ *
+ * If This function returns TRUE, @response will contain a valid response
+ * message that should be cleaned with rtsp_message_unset() after usage. 
+ *
+ * If @code is NULL, this function will return FALSE (with an invalid @response
+ * message) if the response code was not 200 (OK).
+ *
+ * If the attempt results in an authentication failure, then this will attempt
+ * to retrieve authentication credentials via gst_rtspsrc_setup_auth and retry
+ * the request.
+ *
+ * Returns: TRUE if the processing was successful.
+ */
+gboolean
+gst_rtspsrc_send (GstRTSPSrc * src, RTSPMessage * request,
+    RTSPMessage * response, RTSPStatusCode * code)
+{
+  RTSPStatusCode int_code = RTSP_STS_OK;
+  gboolean res;
+  gboolean retry;
+
+  do {
+    retry = FALSE;
+    res = gst_rtspsrc_try_send (src, request, response, &int_code);
+
+    if (int_code == RTSP_STS_UNAUTHORIZED) {
+      if (gst_rtspsrc_setup_auth (src, response)) {
+        /* Try the request/response again after configuring the auth info
+         * and loop again */
+        retry = TRUE;
+      }
+    }
+  } while (retry == TRUE);
+
+  /* If the user requested the code, let them handle errors, otherwise
+   * post an error below */
+  if (code != NULL)
+    *code = int_code;
+  else if (int_code != RTSP_STS_OK)
+    goto error_response;
+
+  return res;
+
+error_response:
+  {
+    switch (response->type_data.response.code) {
+      case RTSP_STS_NOT_FOUND:
+        GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), ("%s",
+                response->type_data.response.reason));
+        break;
+      default:
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("Got error response: %d (%s).", response->type_data.response.code,
+                response->type_data.response.reason));
+        break;
+    }
+    /* we return FALSE so we should unset the response ourselves */
+    rtsp_message_unset (response);
+    return FALSE;
+  }
+}
+
 /* parse the response and collect all the supported methods. We need this
  * information so that we don't try to send an unsupported request to the
  * server.
@@ -2896,6 +2910,23 @@ cleanup_error:
   }
 }
 
+#if 0
+static gboolean
+gst_rtspsrc_async_open (GstRTSPSrc * src)
+{
+  GError *error = NULL;
+  gboolean res = TRUE;
+
+  src->thread =
+      g_thread_create ((GThreadFunc) gst_rtspsrc_open, src, TRUE, &error);
+  if (error != NULL) {
+    GST_ELEMENT_ERROR (src, RESOURCE, INIT, (NULL),
+        ("Could not start async thread (%s).", error->message));
+  }
+  return res;
+}
+#endif
+
 static gboolean
 gst_rtspsrc_close (GstRTSPSrc * src)
 {
@@ -2907,15 +2938,15 @@ gst_rtspsrc_close (GstRTSPSrc * src)
 
   GST_RTSP_STATE_LOCK (src);
 
-  gst_rtspsrc_loop_send_cmd (src, CMD_STOP);
+  gst_rtspsrc_loop_send_cmd (src, CMD_STOP, TRUE);
 
   /* stop task if any */
   if (src->task) {
     gst_task_stop (src->task);
 
     /* make sure it is not running */
-    g_static_rec_mutex_lock (src->stream_rec_lock);
-    g_static_rec_mutex_unlock (src->stream_rec_lock);
+    GST_RTSP_STREAM_LOCK (src);
+    GST_RTSP_STREAM_UNLOCK (src);
 
     /* no wait for the task to finish */
     gst_task_join (src->task);
@@ -2925,6 +2956,9 @@ gst_rtspsrc_close (GstRTSPSrc * src)
     src->task = NULL;
   }
 
+  GST_DEBUG_OBJECT (src, "stop flush");
+  rtsp_connection_flush (src->connection, FALSE);
+
   if (src->methods & RTSP_PLAY) {
     /* do TEARDOWN */
     res =
@@ -3096,7 +3130,6 @@ gst_rtspsrc_play (GstRTSPSrc * src)
    * Play Time) and should be put in the NEWSEGMENT position field. */
   rtsp_message_get_header (&response, RTSP_HDR_RANGE, &range);
 
-
   /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp
    * for the RTP packets. If this is not present, we assume all starts from 0... 
    * FIXME, this is info for the RTP session manager ideally. */
@@ -3111,11 +3144,11 @@ gst_rtspsrc_play (GstRTSPSrc * src)
    * For UDP we start the task as well to look for server info and UDP timeouts. */
   if (src->task == NULL) {
     src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_loop, src);
-    gst_task_set_lock (src->task, src->stream_rec_lock);
+    gst_task_set_lock (src->task, GST_RTSP_STREAM_GET_LOCK (src));
   }
   src->running = TRUE;
   src->state = RTSP_STATE_PLAYING;
-  gst_rtspsrc_loop_send_cmd (src, CMD_WAIT);
+  gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, FALSE);
   gst_task_start (src->task);
 
 done:
@@ -3227,8 +3260,21 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
       const GstStructure *s = gst_message_get_structure (message);
 
       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
+        gboolean ignore_timeout;
+
         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
-        gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT);
+
+        GST_OBJECT_LOCK (rtspsrc);
+        ignore_timeout = rtspsrc->ignore_timeout;
+        rtspsrc->ignore_timeout = TRUE;
+        GST_OBJECT_UNLOCK (rtspsrc);
+
+        /* we only act on the first udp timeout message, others are irrelevant
+         * and can be ignored. */
+        if (ignore_timeout)
+          gst_message_unref (message);
+        else
+          gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT, TRUE);
         return;
       }
       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
@@ -3300,10 +3346,13 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       rtspsrc->cur_protocols = rtspsrc->protocols;
+      /* first attempt, don't ignore timeouts */
+      rtspsrc->ignore_timeout = FALSE;
       if (!gst_rtspsrc_open (rtspsrc))
         goto open_failed;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      GST_DEBUG_OBJECT (rtspsrc, "stop flush");
       rtsp_connection_flush (rtspsrc->connection, FALSE);
       /* FIXME, the server might send UDP packets before we activate the UDP
        * ports */
@@ -3311,6 +3360,7 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      GST_DEBUG_OBJECT (rtspsrc, "start flush");
       rtsp_connection_flush (rtspsrc->connection, TRUE);
       break;
     default:
index 96569bb..f2221f7 100644 (file)
@@ -67,9 +67,13 @@ G_BEGIN_DECLS
 typedef struct _GstRTSPSrc GstRTSPSrc;
 typedef struct _GstRTSPSrcClass GstRTSPSrcClass;
 
-#define GST_RTSP_STATE_GET_LOCK(rtsp)    (GST_RTSPSRC_CAST(rtsp)->state_lock)
-#define GST_RTSP_STATE_LOCK(rtsp)        (g_mutex_lock (GST_RTSP_STATE_GET_LOCK(rtsp)))
-#define GST_RTSP_STATE_UNLOCK(rtsp)      (g_mutex_unlock (GST_RTSP_STATE_GET_LOCK(rtsp)))
+#define GST_RTSP_STATE_GET_LOCK(rtsp)    (GST_RTSPSRC_CAST(rtsp)->state_rec_lock)
+#define GST_RTSP_STATE_LOCK(rtsp)        (g_static_rec_mutex_lock (GST_RTSP_STATE_GET_LOCK(rtsp)))
+#define GST_RTSP_STATE_UNLOCK(rtsp)      (g_static_rec_mutex_unlock (GST_RTSP_STATE_GET_LOCK(rtsp)))
+
+#define GST_RTSP_STREAM_GET_LOCK(rtsp)    (GST_RTSPSRC_CAST(rtsp)->stream_rec_lock)
+#define GST_RTSP_STREAM_LOCK(rtsp)        (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
+#define GST_RTSP_STREAM_UNLOCK(rtsp)      (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
 
 typedef struct _GstRTSPStream GstRTSPStream;
 
@@ -121,9 +125,12 @@ struct _GstRTSPSrc {
   gboolean         running;
   gint             free_channel;
 
-  /* cond to signal loop */
+  /* UDP mode loop */
   gint             loop_cmd;
-  GMutex          *state_lock;
+  gboolean         ignore_timeout;
+
+  /* mutex for protecting state changes */
+  GStaticRecMutex *state_rec_lock;
 
   gint             numstreams;
   GList           *streams;
index e2d375e..e5ad277 100644 (file)
@@ -205,6 +205,9 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout)
   if (fd == -1)
     goto sys_error;
 
+  /* set to non-blocking mode so that we can cancel the connect */
+  //fcntl (fd, F_SETFL, O_NONBLOCK);
+
   ret = connect (fd, (struct sockaddr *) &sin, sizeof (sin));
   if (ret != 0)
     goto sys_error;
@@ -216,6 +219,8 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout)
 
 sys_error:
   {
+    if (fd != -1)
+      CLOSE_SOCKET (fd);
     return RTSP_ESYS;
   }
 not_resolved:
@@ -828,7 +833,6 @@ rtsp_connection_close (RTSPConnection * conn)
   gint res;
 
   g_return_val_if_fail (conn != NULL, RTSP_EINVAL);
-  g_return_val_if_fail (conn->fd >= 0, RTSP_EINVAL);
 
   if (conn->fd != -1) {
     res = CLOSE_SOCKET (conn->fd);