filter: Release lock in filter functions
authorWim Taymans <wtaymans@redhat.com>
Thu, 10 Jul 2014 09:32:20 +0000 (11:32 +0200)
committerWim Taymans <wtaymans@redhat.com>
Thu, 10 Jul 2014 09:36:55 +0000 (11:36 +0200)
Release the object lock before calling the filter functions. We need to
keep a cookie to detect when the list changed during the filter
callback. We also keep a hashtable to make sure we only call the filter
function once for each object in case of concurrent modification.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=732950

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-server.c
gst/rtsp-server/rtsp-session-pool.c
gst/rtsp-server/rtsp-session.c
gst/rtsp-server/rtsp-stream.c

index 43c671a..0fd751a 100644 (file)
@@ -83,6 +83,7 @@ struct _GstRTSPClientPrivate
 
   GList *transports;
   GList *sessions;
+  guint sessions_cookie;
 
   gboolean drop_backlog;
 };
@@ -305,6 +306,7 @@ client_watch_session (GstRTSPClient * client, GstRTSPSession * session)
     GST_INFO ("watching session %p", session);
 
     priv->sessions = g_list_prepend (priv->sessions, g_object_ref (session));
+    priv->sessions_cookie++;
 
     /* connect removed session handler, it will be disconnected when the last
      * session gets removed  */
@@ -334,12 +336,12 @@ client_unwatch_session (GstRTSPClient * client, GstRTSPSession * session,
   }
 
   priv->sessions = g_list_delete_link (priv->sessions, link);
+  priv->sessions_cookie++;
 
   /* if this was the last session, disconnect the handler.
    * This will also drop the extra client ref */
   if (!priv->sessions) {
-    g_signal_handler_disconnect (priv->session_pool,
-        priv->session_removed_id);
+    g_signal_handler_disconnect (priv->session_pool, priv->session_removed_id);
     priv->session_removed_id = 0;
   }
 
@@ -3455,29 +3457,50 @@ gst_rtsp_client_session_filter (GstRTSPClient * client,
 {
   GstRTSPClientPrivate *priv;
   GList *result, *walk, *next;
+  GHashTable *visited;
+  guint cookie;
 
   g_return_val_if_fail (GST_IS_RTSP_CLIENT (client), NULL);
 
   priv = client->priv;
 
   result = NULL;
+  if (func)
+    visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
 
   g_mutex_lock (&priv->lock);
+restart:
+  cookie = priv->sessions_cookie;
   for (walk = priv->sessions; walk; walk = next) {
     GstRTSPSession *sess = walk->data;
     GstRTSPFilterResult res;
+    gboolean changed;
 
     next = g_list_next (walk);
 
-    if (func)
+    if (func) {
+      /* only visit each session once */
+      if (g_hash_table_contains (visited, sess))
+        continue;
+
+      g_hash_table_add (visited, g_object_ref (sess));
+      g_mutex_unlock (&priv->lock);
+
       res = func (client, sess, user_data);
-    else
+
+      g_mutex_lock (&priv->lock);
+    } else
       res = GST_RTSP_FILTER_REF;
 
+    changed = (cookie != priv->sessions_cookie);
+
     switch (res) {
       case GST_RTSP_FILTER_REMOVE:
-        /* stop watching the session and pretent it went away */
-        client_unwatch_session (client, sess, walk);
+        /* stop watching the session and pretend it went away, if the list was
+         * changed, we can't use the current list position, try to see if we
+         * still have the session */
+        client_unwatch_session (client, sess, changed ? NULL : walk);
+        cookie = priv->sessions_cookie;
         break;
       case GST_RTSP_FILTER_REF:
         result = g_list_prepend (result, g_object_ref (sess));
@@ -3486,8 +3509,13 @@ gst_rtsp_client_session_filter (GstRTSPClient * client,
       default:
         break;
     }
+    if (changed)
+      goto restart;
   }
   g_mutex_unlock (&priv->lock);
 
+  if (func)
+    g_hash_table_unref (visited);
+
   return result;
 }
index a5ea95a..84a7d50 100644 (file)
@@ -90,6 +90,7 @@ struct _GstRTSPServerPrivate
 
   /* the clients that are connected */
   GList *clients;
+  guint clients_cookie;
 };
 
 #define DEFAULT_ADDRESS         "0.0.0.0"
@@ -999,6 +1000,7 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
 
   GST_RTSP_SERVER_LOCK (server);
   priv->clients = g_list_remove (priv->clients, ctx);
+  priv->clients_cookie++;
   GST_RTSP_SERVER_UNLOCK (server);
 
   if (ctx->thread) {
@@ -1050,6 +1052,7 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
 
   g_signal_connect (client, "closed", (GCallback) unmanage_client, cctx);
   priv->clients = g_list_prepend (priv->clients, cctx);
+  priv->clients_cookie++;
 
   gst_rtsp_client_attach (client, mainctx);
 
@@ -1361,38 +1364,62 @@ gst_rtsp_server_client_filter (GstRTSPServer * server,
 {
   GstRTSPServerPrivate *priv;
   GList *result, *walk, *next;
+  GHashTable *visited;
+  guint cookie;
 
   g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
 
   priv = server->priv;
 
   result = NULL;
+  if (func)
+    visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
 
   GST_RTSP_SERVER_LOCK (server);
+restart:
+  cookie = priv->clients_cookie;
   for (walk = priv->clients; walk; walk = next) {
     ClientContext *cctx = walk->data;
+    GstRTSPClient *client = cctx->client;
     GstRTSPFilterResult res;
+    gboolean changed;
 
     next = g_list_next (walk);
 
-    if (func)
-      res = func (server, cctx->client, user_data);
-    else
+    if (func) {
+      /* only visit each media once */
+      if (g_hash_table_contains (visited, client))
+        continue;
+
+      g_hash_table_add (visited, g_object_ref (client));
+      GST_RTSP_SERVER_UNLOCK (server);
+
+      res = func (server, client, user_data);
+
+      GST_RTSP_SERVER_LOCK (server);
+    } else
       res = GST_RTSP_FILTER_REF;
 
+    changed = (cookie != priv->clients_cookie);
+
     switch (res) {
       case GST_RTSP_FILTER_REMOVE:
         /* remove client, FIXME */
         break;
       case GST_RTSP_FILTER_REF:
-        result = g_list_prepend (result, g_object_ref (cctx->client));
+        result = g_list_prepend (result, g_object_ref (client));
         break;
       case GST_RTSP_FILTER_KEEP:
       default:
         break;
     }
+    if (changed)
+      goto restart;
   }
   GST_RTSP_SERVER_UNLOCK (server);
 
+  if (func)
+    g_hash_table_unref (visited);
+
   return result;
 }
index a7aa5b2..7699196 100644 (file)
@@ -50,6 +50,7 @@ struct _GstRTSPSessionPoolPrivate
   GMutex lock;                  /* protects everything in this struct */
   guint max_sessions;
   GHashTable *sessions;
+  guint sessions_cookie;
 };
 
 #define DEFAULT_MAX_SESSIONS 0
@@ -394,6 +395,7 @@ gst_rtsp_session_pool_create (GstRTSPSessionPool * pool)
       g_object_ref (result);
       g_hash_table_insert (priv->sessions,
           (gchar *) gst_rtsp_session_get_sessionid (result), result);
+      priv->sessions_cookie++;
     }
     g_mutex_unlock (&priv->lock);
 
@@ -455,6 +457,7 @@ gst_rtsp_session_pool_remove (GstRTSPSessionPool * pool, GstRTSPSession * sess)
       g_hash_table_remove (priv->sessions,
       gst_rtsp_session_get_sessionid (sess));
   if (found) {
+    priv->sessions_cookie++;
     g_signal_emit (pool, gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED],
         0, sess);
   }
@@ -511,44 +514,13 @@ gst_rtsp_session_pool_cleanup (GstRTSPSessionPool * pool)
   result =
       g_hash_table_foreach_remove (priv->sessions, (GHRFunc) cleanup_func,
       &data);
+  if (result > 0)
+    priv->sessions_cookie++;
   g_mutex_unlock (&priv->lock);
 
   return result;
 }
 
-typedef struct
-{
-  GstRTSPSessionPool *pool;
-  GstRTSPSessionPoolFilterFunc func;
-  gpointer user_data;
-  GList *list;
-} FilterData;
-
-static gboolean
-filter_func (gchar * sessionid, GstRTSPSession * sess, FilterData * data)
-{
-  GstRTSPFilterResult res;
-
-  if (data->func)
-    res = data->func (data->pool, sess, data->user_data);
-  else
-    res = GST_RTSP_FILTER_REF;
-
-  switch (res) {
-    case GST_RTSP_FILTER_REMOVE:
-      g_signal_emit (data->pool,
-          gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, sess);
-      return TRUE;
-    case GST_RTSP_FILTER_REF:
-      /* keep ref */
-      data->list = g_list_prepend (data->list, g_object_ref (sess));
-      /* fallthrough */
-    default:
-    case GST_RTSP_FILTER_KEEP:
-      return FALSE;
-  }
-}
-
 /**
  * gst_rtsp_session_pool_filter:
  * @pool: a #GstRTSPSessionPool
@@ -580,22 +552,73 @@ gst_rtsp_session_pool_filter (GstRTSPSessionPool * pool,
     GstRTSPSessionPoolFilterFunc func, gpointer user_data)
 {
   GstRTSPSessionPoolPrivate *priv;
-  FilterData data;
+  GHashTableIter iter;
+  gpointer key, value;
+  GList *result;
+  GHashTable *visited;
+  guint cookie;
 
   g_return_val_if_fail (GST_IS_RTSP_SESSION_POOL (pool), NULL);
 
   priv = pool->priv;
 
-  data.pool = pool;
-  data.func = func;
-  data.user_data = user_data;
-  data.list = NULL;
+  result = NULL;
+  if (func)
+    visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
 
   g_mutex_lock (&priv->lock);
-  g_hash_table_foreach_remove (priv->sessions, (GHRFunc) filter_func, &data);
+restart:
+  g_hash_table_iter_init (&iter, priv->sessions);
+  cookie = priv->sessions_cookie;
+  while (g_hash_table_iter_next (&iter, &key, &value)) {
+    GstRTSPSession *session = value;
+    GstRTSPFilterResult res;
+    gboolean changed;
+
+    if (func) {
+      /* only visit each session once */
+      if (g_hash_table_contains (visited, session))
+        continue;
+
+      g_hash_table_add (visited, g_object_ref (session));
+      g_mutex_unlock (&priv->lock);
+
+      res = func (pool, session, user_data);
+
+      g_mutex_lock (&priv->lock);
+    } else
+      res = GST_RTSP_FILTER_REF;
+
+    changed = (cookie != priv->sessions_cookie);
+
+    switch (res) {
+      case GST_RTSP_FILTER_REMOVE:
+        g_signal_emit (pool,
+            gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, session);
+
+        if (changed)
+          g_hash_table_remove (priv->sessions, key);
+        else
+          g_hash_table_iter_remove (&iter);
+        cookie = ++priv->sessions_cookie;
+        break;
+      case GST_RTSP_FILTER_REF:
+        /* keep ref */
+        result = g_list_prepend (result, g_object_ref (session));
+        break;
+      case GST_RTSP_FILTER_KEEP:
+      default:
+        break;
+    }
+    if (changed)
+      goto restart;
+  }
   g_mutex_unlock (&priv->lock);
 
-  return data.list;
+  if (func)
+    g_hash_table_unref (visited);
+
+  return result;
 }
 
 typedef struct
index 3463b32..372746a 100644 (file)
@@ -62,6 +62,7 @@ struct _GstRTSPSessionPrivate
   gint expire_count;
 
   GList *medias;
+  guint medias_cookie;
 };
 
 #undef DEBUG
@@ -238,6 +239,7 @@ gst_rtsp_session_manage_media (GstRTSPSession * sess, const gchar * path,
 
   g_mutex_lock (&priv->lock);
   priv->medias = g_list_prepend (priv->medias, result);
+  priv->medias_cookie++;
   g_mutex_unlock (&priv->lock);
 
   GST_INFO ("manage new media %p in session %p", media, result);
@@ -269,8 +271,10 @@ gst_rtsp_session_release_media (GstRTSPSession * sess,
 
   g_mutex_lock (&priv->lock);
   find = g_list_find (priv->medias, media);
-  if (find)
+  if (find) {
     priv->medias = g_list_delete_link (priv->medias, find);
+    priv->medias_cookie++;
+  }
   more = (priv->medias != NULL);
   g_mutex_unlock (&priv->lock);
 
@@ -359,29 +363,51 @@ gst_rtsp_session_filter (GstRTSPSession * sess,
 {
   GstRTSPSessionPrivate *priv;
   GList *result, *walk, *next;
+  GHashTable *visited;
+  guint cookie;
 
   g_return_val_if_fail (GST_IS_RTSP_SESSION (sess), NULL);
 
   priv = sess->priv;
 
   result = NULL;
+  if (func)
+    visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
 
   g_mutex_lock (&priv->lock);
+restart:
+  cookie = priv->medias_cookie;
   for (walk = priv->medias; walk; walk = next) {
     GstRTSPSessionMedia *media = walk->data;
     GstRTSPFilterResult res;
+    gboolean changed;
 
     next = g_list_next (walk);
 
-    if (func)
+    if (func) {
+      /* only visit each media once */
+      if (g_hash_table_contains (visited, media))
+        continue;
+
+      g_hash_table_add (visited, g_object_ref (media));
+      g_mutex_unlock (&priv->lock);
+
       res = func (sess, media, user_data);
-    else
+
+      g_mutex_lock (&priv->lock);
+    } else
       res = GST_RTSP_FILTER_REF;
 
+    changed = (cookie != priv->medias_cookie);
+
     switch (res) {
       case GST_RTSP_FILTER_REMOVE:
+        if (changed)
+          priv->medias = g_list_remove (priv->medias, media);
+        else
+          priv->medias = g_list_delete_link (priv->medias, walk);
+        cookie = ++priv->medias_cookie;
         g_object_unref (media);
-        priv->medias = g_list_delete_link (priv->medias, walk);
         break;
       case GST_RTSP_FILTER_REF:
         result = g_list_prepend (result, g_object_ref (media));
@@ -390,9 +416,14 @@ gst_rtsp_session_filter (GstRTSPSession * sess,
       default:
         break;
     }
+    if (changed)
+      goto restart;
   }
   g_mutex_unlock (&priv->lock);
 
+  if (func)
+    g_hash_table_unref (visited);
+
   return result;
 }
 
index 02c8274..942b837 100644 (file)
@@ -124,8 +124,9 @@ struct _GstRTSPStreamPrivate
   /* transports we stream to */
   guint n_active;
   GList *transports;
-  gboolean tr_changed;
+  guint transports_cookie;
   GList *tr_cache;
+  guint tr_cache_cookie;
 
   gint dscp_qos;
 
@@ -1503,13 +1504,13 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
 
   g_mutex_lock (&priv->lock);
-  if (priv->tr_changed) {
+  if (priv->tr_cache_cookie != priv->transports_cookie) {
     clear_tr_cache (priv);
     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
     }
-    priv->tr_changed = FALSE;
+    priv->tr_cache_cookie = priv->transports_cookie;
   }
   g_mutex_unlock (&priv->lock);
 
@@ -2268,7 +2269,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
         priv->transports = g_list_remove (priv->transports, trans);
       }
-      priv->tr_changed = TRUE;
+      priv->transports_cookie++;
       break;
     }
     case GST_RTSP_LOWER_TRANS_TCP:
@@ -2279,7 +2280,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
         GST_INFO ("removing TCP %s", tr->destination);
         priv->transports = g_list_remove (priv->transports, trans);
       }
-      priv->tr_changed = TRUE;
+      priv->transports_cookie++;
       break;
     default:
       goto unknown_transport;
@@ -2497,25 +2498,43 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
 {
   GstRTSPStreamPrivate *priv;
   GList *result, *walk, *next;
+  GHashTable *visited;
+  guint cookie;
 
   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
 
   priv = stream->priv;
 
   result = NULL;
+  if (func)
+    visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
 
   g_mutex_lock (&priv->lock);
+restart:
+  cookie = priv->transports_cookie;
   for (walk = priv->transports; walk; walk = next) {
     GstRTSPStreamTransport *trans = walk->data;
     GstRTSPFilterResult res;
+    gboolean changed;
 
     next = g_list_next (walk);
 
-    if (func)
+    if (func) {
+      /* only visit each transport once */
+      if (g_hash_table_contains (visited, trans))
+        continue;
+
+      g_hash_table_add (visited, g_object_ref (trans));
+      g_mutex_unlock (&priv->lock);
+
       res = func (stream, trans, user_data);
-    else
+
+      g_mutex_lock (&priv->lock);
+    } else
       res = GST_RTSP_FILTER_REF;
 
+    changed = (cookie != priv->transports_cookie);
+
     switch (res) {
       case GST_RTSP_FILTER_REMOVE:
         update_transport (stream, trans, FALSE);
@@ -2527,9 +2546,14 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
       default:
         break;
     }
+    if (changed)
+      goto restart;
   }
   g_mutex_unlock (&priv->lock);
 
+  if (func)
+    g_hash_table_unref (visited);
+
   return result;
 }