rtspconnection: protect cancellable by a mutex
authorMatthew Waters <matthew@centricular.com>
Wed, 27 Jul 2022 05:42:44 +0000 (15:42 +1000)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Thu, 28 Jul 2022 10:32:15 +0000 (10:32 +0000)
It is entirely possible for the cancellable to be cancelled (and freed)
in gst_rtsp_connection_flush() while there may be an ongoing read/write
operation.

Nothing prevents gst_rtsp_connection_flush() from waiting for the
outstanding read/writes.

This could lead to a crash like (where cancellable has been freed
within gst_rtsp_connection_flush()):

 #0  0x00007ffff4351096 in g_output_stream_writev (stream=stream@entry=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6af950,  cancellable=cancellable@entry=0x7fff300288a0, error=error@entry=0x7ffe2c6af958) at ../subprojects/glib/gio/goutputstream.c:377
 #1  0x00007ffff44b2c38 in writev_bytes (stream=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6afb90, block=block@entry=1, cancellable=0x7fff300288a0) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:1320
 #2  0x00007ffff44b583e in gst_rtsp_connection_send_messages_usec (conn=0x7fff30001370, messages=messages@entry=0x7ffe2c6afcc0, n_messages=n_messages@entry=1, timeout=timeout@entry=3000000) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:2056
 #3  0x00007ffff44d2669 in gst_rtsp_client_sink_connection_send_messages (sink=0x7fffac0192c0, timeout=3000000, n_messages=1, messages=0x7ffe2c6afcc0, conninfo=0x7fffac019610) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:1929
 #4  gst_rtsp_client_sink_try_send (sink=sink@entry=0x7fffac0192c0, conninfo=conninfo@entry=0x7fffac019610, requests=requests@entry=0x7ffe2c6afcc0, n_requests=n_requests@entry=1, response=response@entry=0x0, code=code@entry=0x0) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:2845
 #5  0x00007ffff44d3077 in do_send_data (buffer=0x7fff38075c60, channel=<optimized out>, context=0x7fffac042640) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:3896
 #6  0x00007ffff4281cc6 in gst_rtsp_stream_transport_send_rtp (trans=trans@entry=0x7fff20061f80, buffer=<optimized out>) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream-transport.c:632
 #7  0x00007ffff4278e9b in push_data (stream=0x7fff40019bf0, is_rtp=<optimized out>, buffer_list=0x0, buffer=<optimized out>, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2586
 #8  check_transport_backlog (stream=0x7fff40019bf0, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2645
 #9  0x00007ffff42793b3 in send_tcp_message (idx=<optimized out>, stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2741
 #10 send_func (stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2776
 #11 0x00007ffff7d59fad in g_thread_proxy (data=0x7fffbc062920) at ../subprojects/glib/glib/gthread.c:827
 #12 0x00007ffff7a8ce2d in start_thread () from /lib64/libc.so.6
 #13 0x00007ffff7b12620 in clone3 () from /lib64/libc.so.6

Fix by adding a cancellable lock and returning an extra reference used
across all read/write operations.  gst_rtsp_connection_flush() can free
the in-use cancellable and it will no longer affect any in progress
read/write.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2799>

subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c

index 67cc4f5..42bd410 100644 (file)
@@ -172,7 +172,8 @@ struct _GstRTSPConnection
   GMutex socket_use_mutex;
   gboolean manual_http;
   gboolean may_cancel;
-  GCancellable *cancellable;
+  GMutex cancellable_mutex;
+  GCancellable *cancellable;    /* protected by cancellable_mutex */
 
   gchar tunnelid[TUNNELID_LEN];
   gboolean tunneled;
@@ -352,6 +353,20 @@ socket_client_event (GSocketClient * client, GSocketClientEvent event,
   }
 }
 
+/* transfer full */
+static GCancellable *
+get_cancellable (GstRTSPConnection * conn)
+{
+  GCancellable *cancellable = NULL;
+
+  g_mutex_lock (&conn->cancellable_mutex);
+  if (conn->cancellable)
+    cancellable = g_object_ref (conn->cancellable);
+  g_mutex_unlock (&conn->cancellable_mutex);
+
+  return cancellable;
+}
+
 /**
  * gst_rtsp_connection_create:
  * @url: a #GstRTSPUrl
@@ -377,6 +392,7 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
 
   newconn->may_cancel = TRUE;
   newconn->cancellable = g_cancellable_new ();
+  g_mutex_init (&newconn->cancellable_mutex);
   newconn->client = g_socket_client_new ();
 
   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
@@ -839,6 +855,7 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
   gchar *connection_uri = NULL;
   gchar *request_uri = NULL;
   gchar *host = NULL;
+  GCancellable *cancellable;
 
   url = conn->url;
 
@@ -896,18 +913,23 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
 
   connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
 
+  cancellable = get_cancellable (conn);
+
   /* connect to the host/port */
   if (conn->proxy_host) {
     connection = g_socket_client_connect_to_host (conn->client,
-        conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+        conn->proxy_host, conn->proxy_port, cancellable, &error);
     request_uri = g_strdup (connection_uri);
   } else {
     connection = g_socket_client_connect_to_uri (conn->client,
-        connection_uri, 0, conn->cancellable, &error);
+        connection_uri, 0, cancellable, &error);
     request_uri =
         g_strdup_printf ("%s%s%s", url->abspath,
         url->query ? "?" : "", url->query ? url->query : "");
   }
+
+  g_clear_object (&cancellable);
+
   if (connection == NULL)
     goto connect_failed;
 
@@ -1033,6 +1055,7 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
   GstClockTime to;
   guint16 url_port;
   GstRTSPUrl *url;
+  GCancellable *cancellable;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
@@ -1052,18 +1075,23 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
     connection_uri = gst_rtsp_url_get_request_uri (url);
   }
 
+  cancellable = get_cancellable (conn);
+
   if (conn->proxy_host) {
     connection = g_socket_client_connect_to_host (conn->client,
-        conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+        conn->proxy_host, conn->proxy_port, cancellable, &error);
     request_uri = g_strdup (connection_uri);
   } else {
     connection = g_socket_client_connect_to_uri (conn->client,
-        connection_uri, url_port, conn->cancellable, &error);
+        connection_uri, url_port, cancellable, &error);
 
     /* use the relative component of the uri for non-proxy connections */
     request_uri = g_strdup_printf ("%s%s%s", url->abspath,
         url->query ? "?" : "", url->query ? url->query : "");
   }
+
+  g_clear_object (&cancellable);
+
   if (connection == NULL)
     goto connect_failed;
 
@@ -1289,6 +1317,8 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
   /* ERRORS */
 error:
   {
+    g_object_unref (cancellable);
+
     if (G_UNLIKELY (r == 0))
       return GST_RTSP_EEOF;
 
@@ -1396,13 +1426,19 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
   if (G_LIKELY (size > (guint) out)) {
     gssize r;
     gsize count = size - out;
+    GCancellable *cancellable;
+
+    cancellable = conn->may_cancel ? get_cancellable (conn) : NULL;
+
     if (block)
       r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
-          count, conn->may_cancel ? conn->cancellable : NULL, err);
+          count, cancellable, err);
     else
       r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
           (conn->input_stream), (gchar *) & buffer[out], count,
-          conn->may_cancel ? conn->cancellable : NULL, err);
+          cancellable, err);
+
+    g_clear_object (&cancellable);
 
     if (G_UNLIKELY (r < 0)) {
       if (out == 0) {
@@ -1706,6 +1742,7 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
 {
   guint offset;
   GstRTSPResult res;
+  GCancellable *cancellable;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
@@ -1715,9 +1752,10 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
 
   set_write_socket_timeout (conn, timeout);
 
+  cancellable = get_cancellable (conn);
   res =
-      write_bytes (conn->output_stream, data, &offset, size, TRUE,
-      conn->cancellable);
+      write_bytes (conn->output_stream, data, &offset, size, TRUE, cancellable);
+  g_clear_object (&cancellable);
 
   clear_write_socket_timeout (conn);
 
@@ -1911,6 +1949,7 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
   guint n_vectors, n_memories;
   gint i, j, k;
   gsize bytes_to_write, bytes_written;
+  GCancellable *cancellable;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
@@ -2027,9 +2066,11 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
   /* write request: this is synchronous */
   set_write_socket_timeout (conn, timeout);
 
+  cancellable = get_cancellable (conn);
   res =
       writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
-      TRUE, conn->cancellable);
+      TRUE, cancellable);
+  g_clear_object (&cancellable);
 
   clear_write_socket_timeout (conn);
 
@@ -2902,8 +2943,10 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
 
   res = gst_rtsp_connection_close (conn);
 
-  if (conn->cancellable)
-    g_object_unref (conn->cancellable);
+  g_mutex_lock (&conn->cancellable_mutex);
+  g_clear_object (&conn->cancellable);
+  g_mutex_unlock (&conn->cancellable_mutex);
+  g_mutex_clear (&conn->cancellable_mutex);
   if (conn->client)
     g_object_unref (conn->client);
   if (conn->tls_database)
@@ -2949,6 +2992,7 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
   GMainContext *ctx;
   GSource *rs, *ws, *ts;
   GIOCondition condition;
+  GCancellable *cancellable;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
@@ -2966,21 +3010,22 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
     g_source_unref (ts);
   }
 
+  cancellable = get_cancellable (conn);
   if (events & GST_RTSP_EV_READ) {
     rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
-        conn->cancellable);
+        cancellable);
     g_source_set_dummy_callback (rs);
     g_source_attach (rs, ctx);
     g_source_unref (rs);
   }
 
   if (events & GST_RTSP_EV_WRITE) {
-    ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
-        conn->cancellable);
+    ws = g_socket_create_source (conn->write_socket, G_IO_OUT, cancellable);
     g_source_set_dummy_callback (ws);
     g_source_attach (ws, ctx);
     g_source_unref (ws);
   }
+  g_clear_object (&cancellable);
 
   /* Returns after handling all pending events */
   while (!g_main_context_iteration (ctx, TRUE));
@@ -3089,10 +3134,14 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
 
   if (flush) {
-    g_cancellable_cancel (conn->cancellable);
+    GCancellable *cancellable = get_cancellable (conn);
+    g_cancellable_cancel (cancellable);
+    g_clear_object (&cancellable);
   } else {
+    g_mutex_lock (&conn->cancellable_mutex);
     g_object_unref (conn->cancellable);
     conn->cancellable = g_cancellable_new ();
+    g_mutex_unlock (&conn->cancellable_mutex);
   }
 
   return GST_RTSP_OK;
@@ -3606,6 +3655,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
     }
 
     /* clean up some of the state of conn2 */
+    g_mutex_lock (&conn2->cancellable_mutex);
     g_cancellable_cancel (conn2->cancellable);
     conn2->write_socket = conn2->read_socket = NULL;
     conn2->socket0 = NULL;
@@ -3616,6 +3666,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
     conn2->control_stream = NULL;
     g_object_unref (conn2->cancellable);
     conn2->cancellable = NULL;
+    g_mutex_unlock (&conn2->cancellable_mutex);
 
     /* We make socket0 the write socket and socket1 the read socket. */
     conn->write_socket = conn->socket0;
@@ -3974,6 +4025,7 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
     guint n_vectors, n_memories, n_ids, drop_messages;
     gint i, j, l, n_mmap;
     GstRTSPSerializedMessage *msg;
+    GCancellable *cancellable;
 
     /* if this connection was already closed, stop now */
     if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
@@ -4099,11 +4151,15 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
       }
     }
 
+    cancellable = get_cancellable (watch->conn);
+
     res =
         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
-        &bytes_written, FALSE, watch->conn->cancellable);
+        &bytes_written, FALSE, cancellable);
     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
 
+    g_clear_object (&cancellable);
+
     /* First unmap all memories here, this simplifies the code below
      * as we don't have to skip all memories that were already written
      * before */
@@ -4479,6 +4535,7 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
 {
   GstRTSPResult res;
   GMainContext *context = NULL;
+  GCancellable *cancellable;
   gint i;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
@@ -4539,11 +4596,15 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
       }
     }
 
+    cancellable = get_cancellable (watch->conn);
+
     res =
         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
-        &bytes_written, FALSE, watch->conn->cancellable);
+        &bytes_written, FALSE, cancellable);
     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
 
+    g_clear_object (&cancellable);
+
     /* At this point we sent everything we could without blocking or
      * error and updated the offsets inside the message accordingly */