srt: Clean up locking
authorJan Alexander Steffens (heftig) <jsteffens@make.tv>
Wed, 18 Mar 2020 16:55:38 +0000 (17:55 +0100)
committerJan Alexander Steffens (heftig) <jsteffens@make.tv>
Wed, 15 Apr 2020 08:42:47 +0000 (10:42 +0200)
Use GST_OBJECT_LOCK (srtobject->element) to protect only the fields
involved in property access.

Introduce a new mutex srtobject->sock_lock to go with
srtobject->sock_cond and protect the list of callers from concurrent
access.

ext/srt/gstsrtobject.c
ext/srt/gstsrtobject.h
ext/srt/gstsrtsink.c
ext/srt/gstsrtsrc.c

index f726f4c..27b448e 100644 (file)
@@ -88,6 +88,7 @@ srt_caller_free (SRTCaller * caller)
   g_free (caller);
 }
 
+/* called with sock_lock */
 static void
 srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
 {
@@ -132,12 +133,14 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
 {
   struct srt_constant_params *params = srt_params;
 
+  GST_OBJECT_LOCK (srtobject->element);
+
   for (; params->name != NULL; params++) {
     if (srt_setsockopt (sock, 0, params->param, &params->val, sizeof (gint))) {
       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
           "failed to set %s (reason: %s)", params->name,
           srt_getlasterror_str ());
-      return FALSE;
+      goto err;
     }
   }
 
@@ -149,7 +152,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
           "failed to set passphrase (reason: %s)", srt_getlasterror_str ());
 
-      return FALSE;
+      goto err;
     }
 
     if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
@@ -159,7 +162,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
     if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
           "failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
-      return FALSE;
+      goto err;
     }
   }
 
@@ -171,7 +174,7 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
     if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
           "failed to set latency (reason: %s)", srt_getlasterror_str ());
-      return FALSE;
+      goto err;
     }
   }
 
@@ -187,7 +190,12 @@ gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
     }
   }
 
+  GST_OBJECT_UNLOCK (srtobject->element);
   return TRUE;
+
+err:
+  GST_OBJECT_UNLOCK (srtobject->element);
+  return FALSE;
 }
 
 GstSRTObject *
@@ -254,12 +262,12 @@ gboolean
 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
     guint prop_id, const GValue * value, GParamSpec * pspec)
 {
+  GST_OBJECT_LOCK (srtobject->element);
+
   switch (prop_id) {
-    case PROP_URI:{
-      const gchar *uri = g_value_get_string (value);
-      gst_srt_object_set_uri (srtobject, uri, NULL);
+    case PROP_URI:
+      gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL);
       break;
-    }
     case PROP_MODE:
       gst_structure_set_value (srtobject->parameters, "mode", value);
       break;
@@ -283,20 +291,26 @@ gst_srt_object_set_property_helper (GstSRTObject * srtobject,
       gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
       break;
     case PROP_WAIT_FOR_CONNECTION:
-      GST_OBJECT_LOCK (srtobject->element);
       srtobject->wait_for_connection = g_value_get_boolean (value);
-      GST_OBJECT_UNLOCK (srtobject->element);
       break;
     default:
-      return FALSE;
+      goto err;
   }
+
+  GST_OBJECT_UNLOCK (srtobject->element);
   return TRUE;
+
+err:
+  GST_OBJECT_UNLOCK (srtobject->element);
+  return FALSE;
 }
 
 gboolean
 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
     guint prop_id, GValue * value, GParamSpec * pspec)
 {
+  GST_OBJECT_LOCK (srtobject->element);
+
   switch (prop_id) {
     case PROP_URI:
       g_value_take_string (value, gst_uri_to_string (srtobject->uri));
@@ -355,15 +369,19 @@ gst_srt_object_get_property_helper (GstSRTObject * srtobject,
     case PROP_STATS:
       g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
       break;
-    case PROP_WAIT_FOR_CONNECTION:{
+    case PROP_WAIT_FOR_CONNECTION:
       g_value_set_boolean (value, srtobject->wait_for_connection);
       break;
-    }
     default:
-      return FALSE;
+      goto err;
   }
 
+  GST_OBJECT_UNLOCK (srtobject->element);
   return TRUE;
+
+err:
+  GST_OBJECT_UNLOCK (srtobject->element);
+  return FALSE;
 }
 
 void
@@ -563,6 +581,7 @@ gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
   }
 }
 
+/* called with GST_OBJECT_LOCK (srtobject->element) held */
 gboolean
 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
     GError ** err)
@@ -653,10 +672,12 @@ thread_func (gpointer data)
   gint rsocklen = 1;
 
   for (;;) {
+    GST_OBJECT_LOCK (srtobject->element);
     if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
             &poll_timeout)) {
       poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
     }
+    GST_OBJECT_UNLOCK (srtobject->element);
 
     GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
 
@@ -706,10 +727,10 @@ thread_func (gpointer data)
         continue;
       }
 
-      GST_OBJECT_LOCK (srtobject->element);
+      g_mutex_lock (&srtobject->sock_lock);
       srtobject->callers = g_list_append (srtobject->callers, caller);
       g_cond_signal (&srtobject->sock_cond);
-      GST_OBJECT_UNLOCK (srtobject->element);
+      g_mutex_unlock (&srtobject->sock_lock);
 
       /* notifying caller-added */
       if (srtobject->caller_added_closure != NULL) {
@@ -749,6 +770,8 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
   gsize bind_sa_len;
   GSocketAddress *bind_addr;
 
+  GST_OBJECT_LOCK (srtobject->element);
+
   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
 
   local_address =
@@ -756,6 +779,8 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
   if (local_address == NULL)
     local_address = GST_SRT_DEFAULT_LOCALADDRESS;
 
+  GST_OBJECT_UNLOCK (srtobject->element);
+
   bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
   bind_sa_len = g_socket_address_get_native_size (bind_addr);
   bind_sa = g_alloca (bind_sa_len);
@@ -881,9 +906,12 @@ gst_srt_object_connect (GstSRTObject * srtobject,
     goto failed;
   }
 
+  GST_OBJECT_LOCK (srtobject->element);
   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
   local_address =
       gst_structure_get_string (srtobject->parameters, "localaddress");
+  GST_OBJECT_UNLOCK (srtobject->element);
+
   /* According to SRT norm, bind local address and port if specified */
   if (local_address != NULL && local_port != 0) {
     gpointer bind_sa;
@@ -983,6 +1011,10 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
   gpointer sa;
   size_t sa_len;
   const gchar *addr_str;
+  guint port;
+  gboolean ret = FALSE;
+
+  GST_OBJECT_LOCK (srtobject->element);
 
   srtobject->opened = FALSE;
 
@@ -1001,7 +1033,6 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
   }
 
   addr_str = gst_uri_get_host (srtobject->uri);
-
   if (addr_str == NULL) {
     addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
     GST_DEBUG_OBJECT (srtobject->element,
@@ -1009,9 +1040,22 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
         " setting listener mode", addr_str);
   }
 
-  socket_address =
-      g_inet_socket_address_new_from_string (addr_str,
-      gst_uri_get_port (srtobject->uri));
+  port = gst_uri_get_port (srtobject->uri);
+
+  GST_DEBUG_OBJECT (srtobject->element,
+      "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
+      srtobject->parameters);
+
+  if (!gst_structure_get_enum (srtobject->parameters,
+          "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
+    GST_WARNING_OBJECT (srtobject->element,
+        "Cannot get connection mode information." " Use default mode");
+    connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
+  }
+
+  GST_OBJECT_UNLOCK (srtobject->element);
+
+  socket_address = g_inet_socket_address_new_from_string (addr_str, port);
 
   if (socket_address == NULL) {
     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
@@ -1033,33 +1077,26 @@ gst_srt_object_open_full (GstSRTObject * srtobject,
     goto out;
   }
 
-  GST_DEBUG_OBJECT (srtobject->element,
-      "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
-      srtobject->parameters);
-
-  if (!gst_structure_get_enum (srtobject->parameters,
-          "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
-    GST_WARNING_OBJECT (srtobject->element,
-        "Cannot get connection mode information." " Use default mode");
-    connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
-  }
-
   srtobject->listener_poll_id = srt_epoll_create ();
 
-  srtobject->opened =
+  ret =
       gst_srt_object_open_connection
       (srtobject, cancellable, connection_mode, sa, sa_len, error);
 
+  GST_OBJECT_LOCK (srtobject->element);
+  srtobject->opened = ret;
+  GST_OBJECT_UNLOCK (srtobject->element);
+
 out:
   g_clear_object (&socket_address);
 
-  return srtobject->opened;
+  return ret;
 }
 
 void
 gst_srt_object_close (GstSRTObject * srtobject)
 {
-  GST_OBJECT_LOCK (srtobject->element);
+  g_mutex_lock (&srtobject->sock_lock);
   if (srtobject->poll_id != SRT_ERROR) {
     srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
   }
@@ -1080,9 +1117,9 @@ gst_srt_object_close (GstSRTObject * srtobject)
   }
   if (srtobject->thread) {
     GThread *thread = g_steal_pointer (&srtobject->thread);
-    GST_OBJECT_UNLOCK (srtobject->element);
+    g_mutex_unlock (&srtobject->sock_lock);
     g_thread_join (thread);
-    GST_OBJECT_LOCK (srtobject->element);
+    g_mutex_lock (&srtobject->sock_lock);
   }
 
   if (srtobject->listener_sock != SRT_INVALID_SOCK) {
@@ -1095,16 +1132,17 @@ gst_srt_object_close (GstSRTObject * srtobject)
 
   if (srtobject->callers) {
     GList *callers = g_steal_pointer (&srtobject->callers);
-    GST_OBJECT_UNLOCK (srtobject->element);
     g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
         srtobject);
-    GST_OBJECT_LOCK (srtobject->element);
     g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
   }
 
   g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
   g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
 
+  g_mutex_unlock (&srtobject->sock_lock);
+
+  GST_OBJECT_LOCK (srtobject->element);
   srtobject->opened = FALSE;
   GST_OBJECT_UNLOCK (srtobject->element);
 }
@@ -1117,15 +1155,14 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
 
   GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
 
-  GST_OBJECT_LOCK (srtobject->element);
+  g_mutex_lock (&srtobject->sock_lock);
   while (!g_cancellable_is_cancelled (cancellable)) {
     ret = (srtobject->callers != NULL);
     if (ret)
       break;
-    g_cond_wait (&srtobject->sock_cond,
-        GST_OBJECT_GET_LOCK (srtobject->element));
+    g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
   }
-  GST_OBJECT_UNLOCK (srtobject->element);
+  g_mutex_unlock (&srtobject->sock_lock);
 
   GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
 
@@ -1145,31 +1182,35 @@ gst_srt_object_read (GstSRTObject * srtobject,
   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
           (srtobject->element)) == GST_URI_SRC, -1);
 
+  GST_OBJECT_LOCK (srtobject->element);
+
   gst_structure_get_enum (srtobject->parameters, "mode",
       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
 
-  if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
-    SRTCaller *caller;
+  if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
+          &poll_timeout)) {
+    poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
+  }
 
+  GST_OBJECT_UNLOCK (srtobject->element);
+
+  if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
       return -1;
 
-    GST_OBJECT_LOCK (srtobject->element);
-    caller = srtobject->callers->data;
-    if (srtobject->callers)
+    g_mutex_lock (&srtobject->sock_lock);
+    if (srtobject->callers) {
+      SRTCaller *caller = srtobject->callers->data;
       poll_id = caller->poll_id;
-    GST_OBJECT_UNLOCK (srtobject->element);
+    }
+    g_mutex_unlock (&srtobject->sock_lock);
+
     if (poll_id == SRT_ERROR)
       return 0;
   } else {
     poll_id = srtobject->poll_id;
   }
 
-  if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
-          &poll_timeout)) {
-    poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
-  }
-
   while (!g_cancellable_is_cancelled (cancellable)) {
 
     SRTSOCKET rsock;
@@ -1236,13 +1277,13 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
 
   /* connection is only waited for in listener mode,
    * but there is no harm in raising signal in any case */
-  GST_OBJECT_LOCK (srtobject->element);
+  g_mutex_lock (&srtobject->sock_lock);
   /* however, a race might be harmful ...
    * the cancellation is used as 'flushing' flag here,
    * so make sure it is so detected by the intended part at proper time */
   g_cancellable_cancel (cancellable);
   g_cond_signal (&srtobject->sock_cond);
-  GST_OBJECT_UNLOCK (srtobject->element);
+  g_mutex_unlock (&srtobject->sock_lock);
 }
 
 static gboolean
@@ -1305,7 +1346,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
 {
   GList *callers;
 
-  GST_OBJECT_LOCK (srtobject->element);
+  g_mutex_lock (&srtobject->sock_lock);
   callers = srtobject->callers;
   while (callers != NULL) {
     gssize len = 0;
@@ -1317,8 +1358,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
     callers = callers->next;
 
     if (g_cancellable_is_cancelled (cancellable)) {
-      GST_OBJECT_UNLOCK (srtobject->element);
-      return -1;
+      goto cancelled;
     }
 
     if (!caller->sent_headers) {
@@ -1352,9 +1392,12 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
     srt_caller_free (caller);
   }
 
-  GST_OBJECT_UNLOCK (srtobject->element);
-
+  g_mutex_unlock (&srtobject->sock_lock);
   return mapinfo->size;
+
+cancelled:
+  g_mutex_unlock (&srtobject->sock_lock);
+  return -1;
 }
 
 static gssize
@@ -1367,10 +1410,12 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
   const guint8 *msg = mapinfo->data;
   gint payload_size, optlen = 1;
 
+  GST_OBJECT_LOCK (srtobject->element);
   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
           &poll_timeout)) {
     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
   }
+  GST_OBJECT_UNLOCK (srtobject->element);
 
   if (!srtobject->sent_headers) {
     if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
@@ -1443,16 +1488,20 @@ gst_srt_object_write (GstSRTObject * srtobject,
 {
   gssize len = 0;
   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
+  gboolean wait_for_connection;
 
   /* Only sink element can write data */
   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
           (srtobject->element)) == GST_URI_SINK, -1);
 
+  GST_OBJECT_LOCK (srtobject->element);
   gst_structure_get_enum (srtobject->parameters, "mode",
       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
+  wait_for_connection = srtobject->wait_for_connection;
+  GST_OBJECT_UNLOCK (srtobject->element);
 
   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
-    if (srtobject->wait_for_connection) {
+    if (wait_for_connection) {
       if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
         return -1;
     }
@@ -1534,7 +1583,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
   GstStructure *s = NULL;
   gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
 
-  GST_OBJECT_LOCK (srtobject->element);
+  g_mutex_lock (&srtobject->sock_lock);
   if (srtobject->sock != SRT_INVALID_SOCK) {
     s = get_stats_for_srtsock (srtobject->sock, is_sender);
     goto done;
@@ -1564,7 +1613,7 @@ gst_srt_object_get_stats (GstSRTObject * srtobject)
   }
 
 done:
-  GST_OBJECT_UNLOCK (srtobject->element);
+  g_mutex_unlock (&srtobject->sock_lock);
 
   return s;
 }
index f014434..754ebf2 100644 (file)
@@ -56,14 +56,16 @@ struct _GstSRTObject
   gint                          poll_id;
   gboolean                      sent_headers;
 
-  GCond                         sock_cond;
-
   GTask                        *listener_task;
   SRTSOCKET                     listener_sock;
   gint                          listener_poll_id;
 
   GThread                      *thread;
 
+  /* Protects the list of callers */
+  GMutex                        sock_lock;
+  GCond                         sock_cond;
+
   GList                        *callers;
 
   GClosure                     *caller_added_closure;
index 76a5882..9a7df63 100644 (file)
@@ -374,8 +374,13 @@ gst_srt_sink_uri_set_uri (GstURIHandler * handler,
     const gchar * uri, GError ** error)
 {
   GstSRTSink *self = GST_SRT_SINK (handler);
+  gboolean ret;
 
-  return gst_srt_object_set_uri (self->srtobject, uri, error);
+  GST_OBJECT_LOCK (self);
+  ret = gst_srt_object_set_uri (self->srtobject, uri, error);
+  GST_OBJECT_UNLOCK (self);
+
+  return ret;
 }
 
 static void
index 7bf03d6..14c1dbb 100644 (file)
@@ -342,8 +342,13 @@ gst_srt_src_uri_set_uri (GstURIHandler * handler,
     const gchar * uri, GError ** error)
 {
   GstSRTSrc *self = GST_SRT_SRC (handler);
+  gboolean ret;
 
-  return gst_srt_object_set_uri (self->srtobject, uri, error);
+  GST_OBJECT_LOCK (self);
+  ret = gst_srt_object_set_uri (self->srtobject, uri, error);
+  GST_OBJECT_UNLOCK (self);
+
+  return ret;
 }
 
 static void