rtspsrc: Use a mutex for protecting against concurrent send/receives
authorSebastian Dröge <sebastian@centricular.com>
Thu, 15 Jun 2017 07:40:51 +0000 (10:40 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 15 Jun 2017 12:25:23 +0000 (15:25 +0300)
We currently send data to the RTSP connection from multiple threads:
whenever a command is to be handled and whenever RTCP is generated. This
can cause data corruption or worse if both happen at the same time.

As such, protect gst_rtsp_connection_send() and gst_rtsp_connection_receive()
calls with a mutex. While this means that we hold a mutex during the IO
operation, this is not actually a problem as the IO operation can be
interrupted (gst_rtsp_connection_flush()) at any time and is blocking by
itself anyway.

gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h

index a8892d3..6236771 100644 (file)
@@ -2067,29 +2067,35 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
 }
 
 static GstRTSPResult
-gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn,
+gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
     GstRTSPMessage * message, GTimeVal * timeout)
 {
   GstRTSPResult ret;
 
-  if (conn)
-    ret = gst_rtsp_connection_send (conn, message, timeout);
-  else
+  if (conninfo->connection) {
+    g_mutex_lock (&conninfo->send_lock);
+    ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
+    g_mutex_unlock (&conninfo->send_lock);
+  } else {
     ret = GST_RTSP_ERROR;
+  }
 
   return ret;
 }
 
 static GstRTSPResult
-gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn,
+gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
     GstRTSPMessage * message, GTimeVal * timeout)
 {
   GstRTSPResult ret;
 
-  if (conn)
-    ret = gst_rtsp_connection_receive (conn, message, timeout);
-  else
+  if (conninfo->connection) {
+    g_mutex_lock (&conninfo->send_lock);
+    ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
+    g_mutex_unlock (&conninfo->send_lock);
+  } else {
     ret = GST_RTSP_ERROR;
+  }
 
   return ret;
 }
@@ -2503,7 +2509,7 @@ gst_rtspsrc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   guint size;
   GstRTSPResult ret;
   GstRTSPMessage message = { 0 };
-  GstRTSPConnection *conn;
+  GstRTSPConnInfo *conninfo;
 
   stream = (GstRTSPStream *) gst_pad_get_element_private (pad);
   src = stream->parent;
@@ -2518,12 +2524,12 @@ gst_rtspsrc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   gst_rtsp_message_take_body (&message, data, size);
 
   if (stream->conninfo.connection)
-    conn = stream->conninfo.connection;
+    conninfo = &stream->conninfo;
   else
-    conn = src->conninfo.connection;
+    conninfo = &src->conninfo;
 
   GST_DEBUG_OBJECT (src, "sending %u bytes RTCP", size);
-  ret = gst_rtspsrc_connection_send (src, conn, &message, NULL);
+  ret = gst_rtspsrc_connection_send (src, conninfo, &message, NULL);
   GST_DEBUG_OBJECT (src, "sent RTCP, %d", ret);
 
   /* and steal it away again because we will free it when unreffing the
@@ -4213,6 +4219,10 @@ gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info,
         goto could_not_connect;
     }
   } while (!info->connected && retry);
+
+  g_mutex_init (&info->send_lock);
+  g_mutex_init (&info->recv_lock);
+
   gst_rtsp_message_unset (&response);
   return GST_RTSP_OK;
 
@@ -4257,6 +4267,9 @@ gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info,
     gst_rtsp_connection_free (info->connection);
     info->connection = NULL;
     info->flushing = FALSE;
+
+    g_mutex_clear (&info->send_lock);
+    g_mutex_clear (&info->recv_lock);
   }
   GST_RTSP_STATE_UNLOCK (src);
   return GST_RTSP_OK;
@@ -4317,7 +4330,7 @@ gst_rtspsrc_init_request (GstRTSPSrc * src, GstRTSPMessage * msg,
 
 /* FIXME, handle server request, reply with OK, for now */
 static GstRTSPResult
-gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnection * conn,
+gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
     GstRTSPMessage * request)
 {
   GstRTSPMessage response = { 0 };
@@ -4346,7 +4359,7 @@ gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnection * conn,
     if (src->debug)
       gst_rtsp_message_dump (&response);
 
-    res = gst_rtspsrc_connection_send (src, conn, &response, NULL);
+    res = gst_rtspsrc_connection_send (src, conninfo, &response, NULL);
     if (res < 0)
       goto send_error;
 
@@ -4398,9 +4411,7 @@ gst_rtspsrc_send_keep_alive (GstRTSPSrc * src)
   if (src->debug)
     gst_rtsp_message_dump (&request);
 
-  res =
-      gst_rtspsrc_connection_send (src, src->conninfo.connection, &request,
-      NULL);
+  res = gst_rtspsrc_connection_send (src, &src->conninfo, &request, NULL);
   if (res < 0)
     goto send_error;
 
@@ -4681,7 +4692,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
     /* protect the connection with the connection lock so that we can see when
      * we are finished doing server communication */
     res =
-        gst_rtspsrc_connection_receive (src, src->conninfo.connection,
+        gst_rtspsrc_connection_receive (src, &src->conninfo,
         &message, src->ptcp_timeout);
 
     switch (res) {
@@ -4707,9 +4718,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
     switch (message.type) {
       case GST_RTSP_MESSAGE_REQUEST:
         /* server sends us a request message, handle it */
-        res =
-            gst_rtspsrc_handle_request (src, src->conninfo.connection,
-            &message);
+        res = gst_rtspsrc_handle_request (src, &src->conninfo, &message);
         if (res == GST_RTSP_EEOF)
           goto server_eof;
         else if (res < 0)
@@ -4800,7 +4809,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
     /* we should continue reading the TCP socket because the server might
      * send us requests. When the session timeout expires, we need to send a
      * keep-alive request to keep the session open. */
-    res = gst_rtspsrc_connection_receive (src, src->conninfo.connection,
+    res = gst_rtspsrc_connection_receive (src, &src->conninfo,
         &message, &tv_timeout);
 
     switch (res) {
@@ -4840,9 +4849,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
     switch (message.type) {
       case GST_RTSP_MESSAGE_REQUEST:
         /* server sends us a request message, handle it */
-        res =
-            gst_rtspsrc_handle_request (src, src->conninfo.connection,
-            &message);
+        res = gst_rtspsrc_handle_request (src, &src->conninfo, &message);
         if (res == GST_RTSP_EEOF)
           goto server_eof;
         else if (res < 0)
@@ -5384,7 +5391,7 @@ no_user_pass:
 }
 
 static GstRTSPResult
-gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPConnection * conn,
+gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
     GstRTSPMessage * request, GstRTSPMessage * response,
     GstRTSPStatusCode * code)
 {
@@ -5402,14 +5409,16 @@ again:
   if (src->debug)
     gst_rtsp_message_dump (request);
 
-  res = gst_rtspsrc_connection_send (src, conn, request, src->ptcp_timeout);
+  res = gst_rtspsrc_connection_send (src, conninfo, request, src->ptcp_timeout);
   if (res < 0)
     goto send_error;
 
-  gst_rtsp_connection_reset_timeout (conn);
+  gst_rtsp_connection_reset_timeout (conninfo->connection);
 
 next:
-  res = gst_rtspsrc_connection_receive (src, conn, response, src->ptcp_timeout);
+  res =
+      gst_rtspsrc_connection_receive (src, conninfo, response,
+      src->ptcp_timeout);
   if (res < 0)
     goto receive_error;
 
@@ -5418,7 +5427,7 @@ next:
 
   switch (response->type) {
     case GST_RTSP_MESSAGE_REQUEST:
-      res = gst_rtspsrc_handle_request (src, conn, response);
+      res = gst_rtspsrc_handle_request (src, conninfo, response);
       if (res == GST_RTSP_EEOF)
         goto server_eof;
       else if (res < 0)
@@ -5546,7 +5555,7 @@ server_eof:
  * Returns: #GST_RTSP_OK if the processing was successful.
  */
 static GstRTSPResult
-gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnection * conn,
+gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
     GstRTSPMessage * request, GstRTSPMessage * response,
     GstRTSPStatusCode * code)
 {
@@ -5568,7 +5577,8 @@ gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnection * conn,
     method = request->type_data.request.method;
 
     if ((res =
-            gst_rtspsrc_try_send (src, conn, request, response, &int_code)) < 0)
+            gst_rtspsrc_try_send (src, conninfo, request, response,
+                &int_code)) < 0)
       goto error;
 
     switch (int_code) {
@@ -5671,8 +5681,7 @@ static GstRTSPResult
 gst_rtspsrc_send_cb (GstRTSPExtension * ext, GstRTSPMessage * request,
     GstRTSPMessage * response, GstRTSPSrc * src)
 {
-  return gst_rtspsrc_send (src, src->conninfo.connection, request, response,
-      NULL);
+  return gst_rtspsrc_send (src, &src->conninfo, request, response, NULL);
 }
 
 
@@ -6041,7 +6050,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
     goto no_streams;
 
   for (walk = src->streams; walk; walk = g_list_next (walk)) {
-    GstRTSPConnection *conn;
+    GstRTSPConnInfo *conninfo;
     gchar *transports;
     gint retry = 0;
     guint mask = 0;
@@ -6106,9 +6115,9 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
         GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream);
         continue;
       }
-      conn = stream->conninfo.connection;
+      conninfo = &stream->conninfo;
     } else {
-      conn = src->conninfo.connection;
+      conninfo = &src->conninfo;
     }
     GST_DEBUG_OBJECT (src, "doing setup of stream %p with %s", stream,
         stream->conninfo.location);
@@ -6185,7 +6194,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
               stream->id));
 
     /* handle the code ourselves */
-    res = gst_rtspsrc_send (src, conn, &request, &response, &code);
+    res = gst_rtspsrc_send (src, conninfo, &request, &response, &code);
     if (res < 0)
       goto send_error;
 
@@ -6705,7 +6714,7 @@ restart:
     GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving server options"));
 
   if ((res =
-          gst_rtspsrc_send (src, src->conninfo.connection, &request, &response,
+          gst_rtspsrc_send (src, &src->conninfo, &request, &response,
               NULL)) < 0)
     goto send_error;
 
@@ -6732,7 +6741,7 @@ restart:
     GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving media info"));
 
   if ((res =
-          gst_rtspsrc_send (src, src->conninfo.connection, &request, &response,
+          gst_rtspsrc_send (src, &src->conninfo, &request, &response,
               NULL)) < 0)
     goto send_error;
 
@@ -6955,9 +6964,7 @@ gst_rtspsrc_close (GstRTSPSrc * src, gboolean async, gboolean only_close)
     if (async)
       GST_ELEMENT_PROGRESS (src, CONTINUE, "close", ("Closing stream"));
 
-    if ((res =
-            gst_rtspsrc_send (src, info->connection, &request, &response,
-                NULL)) < 0)
+    if ((res = gst_rtspsrc_send (src, info, &request, &response, NULL)) < 0)
       goto send_error;
 
     /* FIXME, parse result? */
@@ -7249,7 +7256,7 @@ restart:
   for (walk = src->streams; walk; walk = g_list_next (walk)) {
     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
     const gchar *setup_url;
-    GstRTSPConnection *conn;
+    GstRTSPConnInfo *conninfo;
 
     /* try aggregate control first but do non-aggregate control otherwise */
     if (control)
@@ -7258,9 +7265,9 @@ restart:
       continue;
 
     if (src->conninfo.connection) {
-      conn = src->conninfo.connection;
+      conninfo = &src->conninfo;
     } else if (stream->conninfo.connection) {
-      conn = stream->conninfo.connection;
+      conninfo = &stream->conninfo;
     } else {
       continue;
     }
@@ -7292,7 +7299,7 @@ restart:
     if (async)
       GST_ELEMENT_PROGRESS (src, CONTINUE, "request", ("Sending PLAY request"));
 
-    if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0)
+    if ((res = gst_rtspsrc_send (src, conninfo, &request, &response, NULL)) < 0)
       goto send_error;
 
     if (src->need_redirect) {
@@ -7470,7 +7477,7 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
    * aggregate control */
   for (walk = src->streams; walk; walk = g_list_next (walk)) {
     GstRTSPStream *stream = (GstRTSPStream *) walk->data;
-    GstRTSPConnection *conn;
+    GstRTSPConnInfo *conninfo;
     const gchar *setup_url;
 
     /* try aggregate control first but do non-aggregate control otherwise */
@@ -7480,9 +7487,9 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
       continue;
 
     if (src->conninfo.connection) {
-      conn = src->conninfo.connection;
+      conninfo = &src->conninfo;
     } else if (stream->conninfo.connection) {
-      conn = stream->conninfo.connection;
+      conninfo = &stream->conninfo;
     } else {
       continue;
     }
@@ -7496,7 +7503,7 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
                 setup_url)) < 0)
       goto create_request_failed;
 
-    if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0)
+    if ((res = gst_rtspsrc_send (src, conninfo, &request, &response, NULL)) < 0)
       goto send_error;
 
     gst_rtsp_message_unset (&request);
index cb71342..87ec1a1 100644 (file)
@@ -86,6 +86,9 @@ struct _GstRTSPConnInfo {
   GstRTSPConnection  *connection;
   gboolean            connected;
   gboolean            flushing;
+
+  GMutex              send_lock;
+  GMutex              recv_lock;
 };
 
 typedef struct _GstRTSPStream GstRTSPStream;