From d2d00e07acc2b1ab1ae5a728ef5dc33c9dee7869 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 18 Mar 2020 17:55:38 +0100 Subject: [PATCH] srt: Clean up locking Use GST_OBJECT_LOCK (srtobject->element) to protect only the fields involved in property access. Introduce a new mutex srtobject->sock_lock to go with srtobject->sock_cond and protect the list of callers from concurrent access. --- ext/srt/gstsrtobject.c | 175 +++++++++++++++++++++++++++++++------------------ ext/srt/gstsrtobject.h | 6 +- ext/srt/gstsrtsink.c | 7 +- ext/srt/gstsrtsrc.c | 7 +- 4 files changed, 128 insertions(+), 67 deletions(-) diff --git a/ext/srt/gstsrtobject.c b/ext/srt/gstsrtobject.c index f726f4c..27b448e 100644 --- a/ext/srt/gstsrtobject.c +++ b/ext/srt/gstsrtobject.c @@ -88,6 +88,7 @@ srt_caller_free (SRTCaller * caller) g_free (caller); } +/* called with sock_lock */ static void srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject) { @@ -132,12 +133,14 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject, { struct srt_constant_params *params = srt_params; + GST_OBJECT_LOCK (srtobject->element); + for (; params->name != NULL; params++) { if (srt_setsockopt (sock, 0, params->param, ¶ms->val, sizeof (gint))) { g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "failed to set %s (reason: %s)", params->name, srt_getlasterror_str ()); - return FALSE; + goto err; } } @@ -149,7 +152,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject, g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "failed to set passphrase (reason: %s)", srt_getlasterror_str ()); - return FALSE; + goto err; } if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) { @@ -159,7 +162,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject, if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) { g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "failed to set pbkeylen (reason: %s)", srt_getlasterror_str ()); - return FALSE; + goto err; } } @@ -171,7 +174,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject, if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) { g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "failed to set latency (reason: %s)", srt_getlasterror_str ()); - return FALSE; + goto err; } } @@ -187,7 +190,12 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject, } } + GST_OBJECT_UNLOCK (srtobject->element); return TRUE; + +err: + GST_OBJECT_UNLOCK (srtobject->element); + return FALSE; } GstSRTObject * @@ -254,12 +262,12 @@ gboolean gst_srt_object_set_property_helper (GstSRTObject * srtobject, guint prop_id, const GValue * value, GParamSpec * pspec) { + GST_OBJECT_LOCK (srtobject->element); + switch (prop_id) { - case PROP_URI:{ - const gchar *uri = g_value_get_string (value); - gst_srt_object_set_uri (srtobject, uri, NULL); + case PROP_URI: + gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL); break; - } case PROP_MODE: gst_structure_set_value (srtobject->parameters, "mode", value); break; @@ -283,20 +291,26 @@ gst_srt_object_set_property_helper (GstSRTObject * srtobject, gst_structure_set_value (srtobject->parameters, "pbkeylen", value); break; case PROP_WAIT_FOR_CONNECTION: - GST_OBJECT_LOCK (srtobject->element); srtobject->wait_for_connection = g_value_get_boolean (value); - GST_OBJECT_UNLOCK (srtobject->element); break; default: - return FALSE; + goto err; } + + GST_OBJECT_UNLOCK (srtobject->element); return TRUE; + +err: + GST_OBJECT_UNLOCK (srtobject->element); + return FALSE; } gboolean gst_srt_object_get_property_helper (GstSRTObject * srtobject, guint prop_id, GValue * value, GParamSpec * pspec) { + GST_OBJECT_LOCK (srtobject->element); + switch (prop_id) { case PROP_URI: g_value_take_string (value, gst_uri_to_string (srtobject->uri)); @@ -355,15 +369,19 @@ gst_srt_object_get_property_helper (GstSRTObject * srtobject, case PROP_STATS: g_value_take_boxed (value, gst_srt_object_get_stats (srtobject)); break; - case PROP_WAIT_FOR_CONNECTION:{ + case PROP_WAIT_FOR_CONNECTION: g_value_set_boolean (value, srtobject->wait_for_connection); break; - } default: - return FALSE; + goto err; } + GST_OBJECT_UNLOCK (srtobject->element); return TRUE; + +err: + GST_OBJECT_UNLOCK (srtobject->element); + return FALSE; } void @@ -563,6 +581,7 @@ gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri) } } +/* called with GST_OBJECT_LOCK (srtobject->element) held */ gboolean gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri, GError ** err) @@ -653,10 +672,12 @@ thread_func (gpointer data) gint rsocklen = 1; for (;;) { + GST_OBJECT_LOCK (srtobject->element); if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &poll_timeout)) { poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; } + GST_OBJECT_UNLOCK (srtobject->element); GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller"); @@ -706,10 +727,10 @@ thread_func (gpointer data) continue; } - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); srtobject->callers = g_list_append (srtobject->callers, caller); g_cond_signal (&srtobject->sock_cond); - GST_OBJECT_UNLOCK (srtobject->element); + g_mutex_unlock (&srtobject->sock_lock); /* notifying caller-added */ if (srtobject->caller_added_closure != NULL) { @@ -749,6 +770,8 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, gsize bind_sa_len; GSocketAddress *bind_addr; + GST_OBJECT_LOCK (srtobject->element); + gst_structure_get_uint (srtobject->parameters, "localport", &local_port); local_address = @@ -756,6 +779,8 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject, if (local_address == NULL) local_address = GST_SRT_DEFAULT_LOCALADDRESS; + GST_OBJECT_UNLOCK (srtobject->element); + bind_addr = g_inet_socket_address_new_from_string (local_address, local_port); bind_sa_len = g_socket_address_get_native_size (bind_addr); bind_sa = g_alloca (bind_sa_len); @@ -881,9 +906,12 @@ gst_srt_object_connect (GstSRTObject * srtobject, goto failed; } + GST_OBJECT_LOCK (srtobject->element); gst_structure_get_uint (srtobject->parameters, "localport", &local_port); local_address = gst_structure_get_string (srtobject->parameters, "localaddress"); + GST_OBJECT_UNLOCK (srtobject->element); + /* According to SRT norm, bind local address and port if specified */ if (local_address != NULL && local_port != 0) { gpointer bind_sa; @@ -983,6 +1011,10 @@ gst_srt_object_open_full (GstSRTObject * srtobject, gpointer sa; size_t sa_len; const gchar *addr_str; + guint port; + gboolean ret = FALSE; + + GST_OBJECT_LOCK (srtobject->element); srtobject->opened = FALSE; @@ -1001,7 +1033,6 @@ gst_srt_object_open_full (GstSRTObject * srtobject, } addr_str = gst_uri_get_host (srtobject->uri); - if (addr_str == NULL) { addr_str = GST_SRT_DEFAULT_LOCALADDRESS; GST_DEBUG_OBJECT (srtobject->element, @@ -1009,9 +1040,22 @@ gst_srt_object_open_full (GstSRTObject * srtobject, " setting listener mode", addr_str); } - socket_address = - g_inet_socket_address_new_from_string (addr_str, - gst_uri_get_port (srtobject->uri)); + port = gst_uri_get_port (srtobject->uri); + + GST_DEBUG_OBJECT (srtobject->element, + "Opening SRT socket with parameters: %" GST_PTR_FORMAT, + srtobject->parameters); + + if (!gst_structure_get_enum (srtobject->parameters, + "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) { + GST_WARNING_OBJECT (srtobject->element, + "Cannot get connection mode information." " Use default mode"); + connection_mode = GST_TYPE_SRT_CONNECTION_MODE; + } + + GST_OBJECT_UNLOCK (srtobject->element); + + socket_address = g_inet_socket_address_new_from_string (addr_str, port); if (socket_address == NULL) { g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, @@ -1033,33 +1077,26 @@ gst_srt_object_open_full (GstSRTObject * srtobject, goto out; } - GST_DEBUG_OBJECT (srtobject->element, - "Opening SRT socket with parameters: %" GST_PTR_FORMAT, - srtobject->parameters); - - if (!gst_structure_get_enum (srtobject->parameters, - "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) { - GST_WARNING_OBJECT (srtobject->element, - "Cannot get connection mode information." " Use default mode"); - connection_mode = GST_TYPE_SRT_CONNECTION_MODE; - } - srtobject->listener_poll_id = srt_epoll_create (); - srtobject->opened = + ret = gst_srt_object_open_connection (srtobject, cancellable, connection_mode, sa, sa_len, error); + GST_OBJECT_LOCK (srtobject->element); + srtobject->opened = ret; + GST_OBJECT_UNLOCK (srtobject->element); + out: g_clear_object (&socket_address); - return srtobject->opened; + return ret; } void gst_srt_object_close (GstSRTObject * srtobject) { - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); if (srtobject->poll_id != SRT_ERROR) { srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock); } @@ -1080,9 +1117,9 @@ gst_srt_object_close (GstSRTObject * srtobject) } if (srtobject->thread) { GThread *thread = g_steal_pointer (&srtobject->thread); - GST_OBJECT_UNLOCK (srtobject->element); + g_mutex_unlock (&srtobject->sock_lock); g_thread_join (thread); - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); } if (srtobject->listener_sock != SRT_INVALID_SOCK) { @@ -1095,16 +1132,17 @@ gst_srt_object_close (GstSRTObject * srtobject) if (srtobject->callers) { GList *callers = g_steal_pointer (&srtobject->callers); - GST_OBJECT_UNLOCK (srtobject->element); g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure, srtobject); - GST_OBJECT_LOCK (srtobject->element); g_list_free_full (callers, (GDestroyNotify) srt_caller_free); } g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref); g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref); + g_mutex_unlock (&srtobject->sock_lock); + + GST_OBJECT_LOCK (srtobject->element); srtobject->opened = FALSE; GST_OBJECT_UNLOCK (srtobject->element); } @@ -1117,15 +1155,14 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject, GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller"); - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); while (!g_cancellable_is_cancelled (cancellable)) { ret = (srtobject->callers != NULL); if (ret) break; - g_cond_wait (&srtobject->sock_cond, - GST_OBJECT_GET_LOCK (srtobject->element)); + g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock); } - GST_OBJECT_UNLOCK (srtobject->element); + g_mutex_unlock (&srtobject->sock_lock); GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no"); @@ -1145,31 +1182,35 @@ gst_srt_object_read (GstSRTObject * srtobject, g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) == GST_URI_SRC, -1); + GST_OBJECT_LOCK (srtobject->element); + gst_structure_get_enum (srtobject->parameters, "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode); - if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - SRTCaller *caller; + if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", + &poll_timeout)) { + poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; + } + GST_OBJECT_UNLOCK (srtobject->element); + + if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { if (!gst_srt_object_wait_caller (srtobject, cancellable, error)) return -1; - GST_OBJECT_LOCK (srtobject->element); - caller = srtobject->callers->data; - if (srtobject->callers) + g_mutex_lock (&srtobject->sock_lock); + if (srtobject->callers) { + SRTCaller *caller = srtobject->callers->data; poll_id = caller->poll_id; - GST_OBJECT_UNLOCK (srtobject->element); + } + g_mutex_unlock (&srtobject->sock_lock); + if (poll_id == SRT_ERROR) return 0; } else { poll_id = srtobject->poll_id; } - if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", - &poll_timeout)) { - poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; - } - while (!g_cancellable_is_cancelled (cancellable)) { SRTSOCKET rsock; @@ -1236,13 +1277,13 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable) /* connection is only waited for in listener mode, * but there is no harm in raising signal in any case */ - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); /* however, a race might be harmful ... * the cancellation is used as 'flushing' flag here, * so make sure it is so detected by the intended part at proper time */ g_cancellable_cancel (cancellable); g_cond_signal (&srtobject->sock_cond); - GST_OBJECT_UNLOCK (srtobject->element); + g_mutex_unlock (&srtobject->sock_lock); } static gboolean @@ -1305,7 +1346,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject, { GList *callers; - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); callers = srtobject->callers; while (callers != NULL) { gssize len = 0; @@ -1317,8 +1358,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject, callers = callers->next; if (g_cancellable_is_cancelled (cancellable)) { - GST_OBJECT_UNLOCK (srtobject->element); - return -1; + goto cancelled; } if (!caller->sent_headers) { @@ -1352,9 +1392,12 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject, srt_caller_free (caller); } - GST_OBJECT_UNLOCK (srtobject->element); - + g_mutex_unlock (&srtobject->sock_lock); return mapinfo->size; + +cancelled: + g_mutex_unlock (&srtobject->sock_lock); + return -1; } static gssize @@ -1367,10 +1410,12 @@ gst_srt_object_write_one (GstSRTObject * srtobject, const guint8 *msg = mapinfo->data; gint payload_size, optlen = 1; + GST_OBJECT_LOCK (srtobject->element); if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &poll_timeout)) { poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT; } + GST_OBJECT_UNLOCK (srtobject->element); if (!srtobject->sent_headers) { if (!gst_srt_object_send_headers (srtobject, srtobject->sock, @@ -1443,16 +1488,20 @@ gst_srt_object_write (GstSRTObject * srtobject, { gssize len = 0; GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE; + gboolean wait_for_connection; /* Only sink element can write data */ g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) == GST_URI_SINK, -1); + GST_OBJECT_LOCK (srtobject->element); gst_structure_get_enum (srtobject->parameters, "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode); + wait_for_connection = srtobject->wait_for_connection; + GST_OBJECT_UNLOCK (srtobject->element); if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - if (srtobject->wait_for_connection) { + if (wait_for_connection) { if (!gst_srt_object_wait_caller (srtobject, cancellable, error)) return -1; } @@ -1534,7 +1583,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject) GstStructure *s = NULL; gboolean is_sender = GST_IS_BASE_SINK (srtobject->element); - GST_OBJECT_LOCK (srtobject->element); + g_mutex_lock (&srtobject->sock_lock); if (srtobject->sock != SRT_INVALID_SOCK) { s = get_stats_for_srtsock (srtobject->sock, is_sender); goto done; @@ -1564,7 +1613,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject) } done: - GST_OBJECT_UNLOCK (srtobject->element); + g_mutex_unlock (&srtobject->sock_lock); return s; } diff --git a/ext/srt/gstsrtobject.h b/ext/srt/gstsrtobject.h index f014434..754ebf2 100644 --- a/ext/srt/gstsrtobject.h +++ b/ext/srt/gstsrtobject.h @@ -56,14 +56,16 @@ struct _GstSRTObject gint poll_id; gboolean sent_headers; - GCond sock_cond; - GTask *listener_task; SRTSOCKET listener_sock; gint listener_poll_id; GThread *thread; + /* Protects the list of callers */ + GMutex sock_lock; + GCond sock_cond; + GList *callers; GClosure *caller_added_closure; diff --git a/ext/srt/gstsrtsink.c b/ext/srt/gstsrtsink.c index 76a5882..9a7df63 100644 --- a/ext/srt/gstsrtsink.c +++ b/ext/srt/gstsrtsink.c @@ -374,8 +374,13 @@ gst_srt_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri, GError ** error) { GstSRTSink *self = GST_SRT_SINK (handler); + gboolean ret; - return gst_srt_object_set_uri (self->srtobject, uri, error); + GST_OBJECT_LOCK (self); + ret = gst_srt_object_set_uri (self->srtobject, uri, error); + GST_OBJECT_UNLOCK (self); + + return ret; } static void diff --git a/ext/srt/gstsrtsrc.c b/ext/srt/gstsrtsrc.c index 7bf03d6..14c1dbb 100644 --- a/ext/srt/gstsrtsrc.c +++ b/ext/srt/gstsrtsrc.c @@ -342,8 +342,13 @@ gst_srt_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, GError ** error) { GstSRTSrc *self = GST_SRT_SRC (handler); + gboolean ret; - return gst_srt_object_set_uri (self->srtobject, uri, error); + GST_OBJECT_LOCK (self); + ret = gst_srt_object_set_uri (self->srtobject, uri, error); + GST_OBJECT_UNLOCK (self); + + return ret; } static void -- 2.7.4