From: Matthew Waters Date: Wed, 27 Jul 2022 05:42:44 +0000 (+1000) Subject: rtspconnection: protect cancellable by a mutex X-Git-Tag: 1.22.0~1215 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=598f2ab73f3f4d104e1dddd5bac0db4fab69584a;p=platform%2Fupstream%2Fgstreamer.git rtspconnection: protect cancellable by a mutex It is entirely possible for the cancellable to be cancelled (and freed) in gst_rtsp_connection_flush() while there may be an ongoing read/write operation. Nothing prevents gst_rtsp_connection_flush() from waiting for the outstanding read/writes. This could lead to a crash like (where cancellable has been freed within gst_rtsp_connection_flush()): #0 0x00007ffff4351096 in g_output_stream_writev (stream=stream@entry=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6af950, cancellable=cancellable@entry=0x7fff300288a0, error=error@entry=0x7ffe2c6af958) at ../subprojects/glib/gio/goutputstream.c:377 #1 0x00007ffff44b2c38 in writev_bytes (stream=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6afb90, block=block@entry=1, cancellable=0x7fff300288a0) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:1320 #2 0x00007ffff44b583e in gst_rtsp_connection_send_messages_usec (conn=0x7fff30001370, messages=messages@entry=0x7ffe2c6afcc0, n_messages=n_messages@entry=1, timeout=timeout@entry=3000000) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:2056 #3 0x00007ffff44d2669 in gst_rtsp_client_sink_connection_send_messages (sink=0x7fffac0192c0, timeout=3000000, n_messages=1, messages=0x7ffe2c6afcc0, conninfo=0x7fffac019610) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:1929 #4 gst_rtsp_client_sink_try_send (sink=sink@entry=0x7fffac0192c0, conninfo=conninfo@entry=0x7fffac019610, requests=requests@entry=0x7ffe2c6afcc0, n_requests=n_requests@entry=1, response=response@entry=0x0, code=code@entry=0x0) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:2845 #5 0x00007ffff44d3077 in do_send_data (buffer=0x7fff38075c60, channel=, context=0x7fffac042640) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:3896 #6 0x00007ffff4281cc6 in gst_rtsp_stream_transport_send_rtp (trans=trans@entry=0x7fff20061f80, buffer=) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream-transport.c:632 #7 0x00007ffff4278e9b in push_data (stream=0x7fff40019bf0, is_rtp=, buffer_list=0x0, buffer=, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2586 #8 check_transport_backlog (stream=0x7fff40019bf0, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2645 #9 0x00007ffff42793b3 in send_tcp_message (idx=, stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2741 #10 send_func (stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2776 #11 0x00007ffff7d59fad in g_thread_proxy (data=0x7fffbc062920) at ../subprojects/glib/glib/gthread.c:827 #12 0x00007ffff7a8ce2d in start_thread () from /lib64/libc.so.6 #13 0x00007ffff7b12620 in clone3 () from /lib64/libc.so.6 Fix by adding a cancellable lock and returning an extra reference used across all read/write operations. gst_rtsp_connection_flush() can free the in-use cancellable and it will no longer affect any in progress read/write. Part-of: --- diff --git a/subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c b/subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c index 67cc4f5..42bd410 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c @@ -172,7 +172,8 @@ struct _GstRTSPConnection GMutex socket_use_mutex; gboolean manual_http; gboolean may_cancel; - GCancellable *cancellable; + GMutex cancellable_mutex; + GCancellable *cancellable; /* protected by cancellable_mutex */ gchar tunnelid[TUNNELID_LEN]; gboolean tunneled; @@ -352,6 +353,20 @@ socket_client_event (GSocketClient * client, GSocketClientEvent event, } } +/* transfer full */ +static GCancellable * +get_cancellable (GstRTSPConnection * conn) +{ + GCancellable *cancellable = NULL; + + g_mutex_lock (&conn->cancellable_mutex); + if (conn->cancellable) + cancellable = g_object_ref (conn->cancellable); + g_mutex_unlock (&conn->cancellable_mutex); + + return cancellable; +} + /** * gst_rtsp_connection_create: * @url: a #GstRTSPUrl @@ -377,6 +392,7 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) newconn->may_cancel = TRUE; newconn->cancellable = g_cancellable_new (); + g_mutex_init (&newconn->cancellable_mutex); newconn->client = g_socket_client_new (); if (url->transports & GST_RTSP_LOWER_TRANS_TLS) @@ -839,6 +855,7 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri, gchar *connection_uri = NULL; gchar *request_uri = NULL; gchar *host = NULL; + GCancellable *cancellable; url = conn->url; @@ -896,18 +913,23 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri, connection_uri = get_tunneled_connection_uri_strdup (url, url_port); + cancellable = get_cancellable (conn); + /* connect to the host/port */ if (conn->proxy_host) { connection = g_socket_client_connect_to_host (conn->client, - conn->proxy_host, conn->proxy_port, conn->cancellable, &error); + conn->proxy_host, conn->proxy_port, cancellable, &error); request_uri = g_strdup (connection_uri); } else { connection = g_socket_client_connect_to_uri (conn->client, - connection_uri, 0, conn->cancellable, &error); + connection_uri, 0, cancellable, &error); request_uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", url->query ? url->query : ""); } + + g_clear_object (&cancellable); + if (connection == NULL) goto connect_failed; @@ -1033,6 +1055,7 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn, GstClockTime to; guint16 url_port; GstRTSPUrl *url; + GCancellable *cancellable; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL); @@ -1052,18 +1075,23 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn, connection_uri = gst_rtsp_url_get_request_uri (url); } + cancellable = get_cancellable (conn); + if (conn->proxy_host) { connection = g_socket_client_connect_to_host (conn->client, - conn->proxy_host, conn->proxy_port, conn->cancellable, &error); + conn->proxy_host, conn->proxy_port, cancellable, &error); request_uri = g_strdup (connection_uri); } else { connection = g_socket_client_connect_to_uri (conn->client, - connection_uri, url_port, conn->cancellable, &error); + connection_uri, url_port, cancellable, &error); /* use the relative component of the uri for non-proxy connections */ request_uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", url->query ? url->query : ""); } + + g_clear_object (&cancellable); + if (connection == NULL) goto connect_failed; @@ -1289,6 +1317,8 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx, /* ERRORS */ error: { + g_object_unref (cancellable); + if (G_UNLIKELY (r == 0)) return GST_RTSP_EEOF; @@ -1396,13 +1426,19 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, if (G_LIKELY (size > (guint) out)) { gssize r; gsize count = size - out; + GCancellable *cancellable; + + cancellable = conn->may_cancel ? get_cancellable (conn) : NULL; + if (block) r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out], - count, conn->may_cancel ? conn->cancellable : NULL, err); + count, cancellable, err); else r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (conn->input_stream), (gchar *) & buffer[out], count, - conn->may_cancel ? conn->cancellable : NULL, err); + cancellable, err); + + g_clear_object (&cancellable); if (G_UNLIKELY (r < 0)) { if (out == 0) { @@ -1706,6 +1742,7 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data, { guint offset; GstRTSPResult res; + GCancellable *cancellable; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); @@ -1715,9 +1752,10 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data, set_write_socket_timeout (conn, timeout); + cancellable = get_cancellable (conn); res = - write_bytes (conn->output_stream, data, &offset, size, TRUE, - conn->cancellable); + write_bytes (conn->output_stream, data, &offset, size, TRUE, cancellable); + g_clear_object (&cancellable); clear_write_socket_timeout (conn); @@ -1911,6 +1949,7 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn, guint n_vectors, n_memories; gint i, j, k; gsize bytes_to_write, bytes_written; + GCancellable *cancellable; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL); @@ -2027,9 +2066,11 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn, /* write request: this is synchronous */ set_write_socket_timeout (conn, timeout); + cancellable = get_cancellable (conn); res = writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written, - TRUE, conn->cancellable); + TRUE, cancellable); + g_clear_object (&cancellable); clear_write_socket_timeout (conn); @@ -2902,8 +2943,10 @@ gst_rtsp_connection_free (GstRTSPConnection * conn) res = gst_rtsp_connection_close (conn); - if (conn->cancellable) - g_object_unref (conn->cancellable); + g_mutex_lock (&conn->cancellable_mutex); + g_clear_object (&conn->cancellable); + g_mutex_unlock (&conn->cancellable_mutex); + g_mutex_clear (&conn->cancellable_mutex); if (conn->client) g_object_unref (conn->client); if (conn->tls_database) @@ -2949,6 +2992,7 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events, GMainContext *ctx; GSource *rs, *ws, *ts; GIOCondition condition; + GCancellable *cancellable; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (events != 0, GST_RTSP_EINVAL); @@ -2966,21 +3010,22 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events, g_source_unref (ts); } + cancellable = get_cancellable (conn); if (events & GST_RTSP_EV_READ) { rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI, - conn->cancellable); + cancellable); g_source_set_dummy_callback (rs); g_source_attach (rs, ctx); g_source_unref (rs); } if (events & GST_RTSP_EV_WRITE) { - ws = g_socket_create_source (conn->write_socket, G_IO_OUT, - conn->cancellable); + ws = g_socket_create_source (conn->write_socket, G_IO_OUT, cancellable); g_source_set_dummy_callback (ws); g_source_attach (ws, ctx); g_source_unref (ws); } + g_clear_object (&cancellable); /* Returns after handling all pending events */ while (!g_main_context_iteration (ctx, TRUE)); @@ -3089,10 +3134,14 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush) g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); if (flush) { - g_cancellable_cancel (conn->cancellable); + GCancellable *cancellable = get_cancellable (conn); + g_cancellable_cancel (cancellable); + g_clear_object (&cancellable); } else { + g_mutex_lock (&conn->cancellable_mutex); g_object_unref (conn->cancellable); conn->cancellable = g_cancellable_new (); + g_mutex_unlock (&conn->cancellable_mutex); } return GST_RTSP_OK; @@ -3606,6 +3655,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, } /* clean up some of the state of conn2 */ + g_mutex_lock (&conn2->cancellable_mutex); g_cancellable_cancel (conn2->cancellable); conn2->write_socket = conn2->read_socket = NULL; conn2->socket0 = NULL; @@ -3616,6 +3666,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, conn2->control_stream = NULL; g_object_unref (conn2->cancellable); conn2->cancellable = NULL; + g_mutex_unlock (&conn2->cancellable_mutex); /* We make socket0 the write socket and socket1 the read socket. */ conn->write_socket = conn->socket0; @@ -3974,6 +4025,7 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, guint n_vectors, n_memories, n_ids, drop_messages; gint i, j, l, n_mmap; GstRTSPSerializedMessage *msg; + GCancellable *cancellable; /* if this connection was already closed, stop now */ if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream || @@ -4099,11 +4151,15 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, } } + cancellable = get_cancellable (watch->conn); + res = writev_bytes (watch->conn->output_stream, vectors, n_vectors, - &bytes_written, FALSE, watch->conn->cancellable); + &bytes_written, FALSE, cancellable); g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); + g_clear_object (&cancellable); + /* First unmap all memories here, this simplifies the code below * as we don't have to skip all memories that were already written * before */ @@ -4479,6 +4535,7 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch, { GstRTSPResult res; GMainContext *context = NULL; + GCancellable *cancellable; gint i; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); @@ -4539,11 +4596,15 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch, } } + cancellable = get_cancellable (watch->conn); + res = writev_bytes (watch->conn->output_stream, vectors, n_vectors, - &bytes_written, FALSE, watch->conn->cancellable); + &bytes_written, FALSE, cancellable); g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); + g_clear_object (&cancellable); + /* At this point we sent everything we could without blocking or * error and updated the offsets inside the message accordingly */