From 945c93fde09b461d4b810846e2a9443f915e6507 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 10 Jul 2014 11:32:20 +0200 Subject: [PATCH] filter: Release lock in filter functions Release the object lock before calling the filter functions. We need to keep a cookie to detect when the list changed during the filter callback. We also keep a hashtable to make sure we only call the filter function once for each object in case of concurrent modification. Fixes https://bugzilla.gnome.org/show_bug.cgi?id=732950 --- gst/rtsp-server/rtsp-client.c | 40 +++++++++++--- gst/rtsp-server/rtsp-server.c | 35 ++++++++++-- gst/rtsp-server/rtsp-session-pool.c | 103 ++++++++++++++++++++++-------------- gst/rtsp-server/rtsp-session.c | 39 ++++++++++++-- gst/rtsp-server/rtsp-stream.c | 38 ++++++++++--- 5 files changed, 194 insertions(+), 61 deletions(-) diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 43c671a..0fd751a 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -83,6 +83,7 @@ struct _GstRTSPClientPrivate GList *transports; GList *sessions; + guint sessions_cookie; gboolean drop_backlog; }; @@ -305,6 +306,7 @@ client_watch_session (GstRTSPClient * client, GstRTSPSession * session) GST_INFO ("watching session %p", session); priv->sessions = g_list_prepend (priv->sessions, g_object_ref (session)); + priv->sessions_cookie++; /* connect removed session handler, it will be disconnected when the last * session gets removed */ @@ -334,12 +336,12 @@ client_unwatch_session (GstRTSPClient * client, GstRTSPSession * session, } priv->sessions = g_list_delete_link (priv->sessions, link); + priv->sessions_cookie++; /* if this was the last session, disconnect the handler. * This will also drop the extra client ref */ if (!priv->sessions) { - g_signal_handler_disconnect (priv->session_pool, - priv->session_removed_id); + g_signal_handler_disconnect (priv->session_pool, priv->session_removed_id); priv->session_removed_id = 0; } @@ -3455,29 +3457,50 @@ gst_rtsp_client_session_filter (GstRTSPClient * client, { GstRTSPClientPrivate *priv; GList *result, *walk, *next; + GHashTable *visited; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_CLIENT (client), NULL); priv = client->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); +restart: + cookie = priv->sessions_cookie; for (walk = priv->sessions; walk; walk = next) { GstRTSPSession *sess = walk->data; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) + if (func) { + /* only visit each session once */ + if (g_hash_table_contains (visited, sess)) + continue; + + g_hash_table_add (visited, g_object_ref (sess)); + g_mutex_unlock (&priv->lock); + res = func (client, sess, user_data); - else + + g_mutex_lock (&priv->lock); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->sessions_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: - /* stop watching the session and pretent it went away */ - client_unwatch_session (client, sess, walk); + /* stop watching the session and pretend it went away, if the list was + * changed, we can't use the current list position, try to see if we + * still have the session */ + client_unwatch_session (client, sess, changed ? NULL : walk); + cookie = priv->sessions_cookie; break; case GST_RTSP_FILTER_REF: result = g_list_prepend (result, g_object_ref (sess)); @@ -3486,8 +3509,13 @@ gst_rtsp_client_session_filter (GstRTSPClient * client, default: break; } + if (changed) + goto restart; } g_mutex_unlock (&priv->lock); + if (func) + g_hash_table_unref (visited); + return result; } diff --git a/gst/rtsp-server/rtsp-server.c b/gst/rtsp-server/rtsp-server.c index a5ea95a..84a7d50 100644 --- a/gst/rtsp-server/rtsp-server.c +++ b/gst/rtsp-server/rtsp-server.c @@ -90,6 +90,7 @@ struct _GstRTSPServerPrivate /* the clients that are connected */ GList *clients; + guint clients_cookie; }; #define DEFAULT_ADDRESS "0.0.0.0" @@ -999,6 +1000,7 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx) GST_RTSP_SERVER_LOCK (server); priv->clients = g_list_remove (priv->clients, ctx); + priv->clients_cookie++; GST_RTSP_SERVER_UNLOCK (server); if (ctx->thread) { @@ -1050,6 +1052,7 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client) g_signal_connect (client, "closed", (GCallback) unmanage_client, cctx); priv->clients = g_list_prepend (priv->clients, cctx); + priv->clients_cookie++; gst_rtsp_client_attach (client, mainctx); @@ -1361,38 +1364,62 @@ gst_rtsp_server_client_filter (GstRTSPServer * server, { GstRTSPServerPrivate *priv; GList *result, *walk, *next; + GHashTable *visited; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL); priv = server->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); GST_RTSP_SERVER_LOCK (server); +restart: + cookie = priv->clients_cookie; for (walk = priv->clients; walk; walk = next) { ClientContext *cctx = walk->data; + GstRTSPClient *client = cctx->client; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) - res = func (server, cctx->client, user_data); - else + if (func) { + /* only visit each media once */ + if (g_hash_table_contains (visited, client)) + continue; + + g_hash_table_add (visited, g_object_ref (client)); + GST_RTSP_SERVER_UNLOCK (server); + + res = func (server, client, user_data); + + GST_RTSP_SERVER_LOCK (server); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->clients_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: /* remove client, FIXME */ break; case GST_RTSP_FILTER_REF: - result = g_list_prepend (result, g_object_ref (cctx->client)); + result = g_list_prepend (result, g_object_ref (client)); break; case GST_RTSP_FILTER_KEEP: default: break; } + if (changed) + goto restart; } GST_RTSP_SERVER_UNLOCK (server); + if (func) + g_hash_table_unref (visited); + return result; } diff --git a/gst/rtsp-server/rtsp-session-pool.c b/gst/rtsp-server/rtsp-session-pool.c index a7aa5b2..7699196 100644 --- a/gst/rtsp-server/rtsp-session-pool.c +++ b/gst/rtsp-server/rtsp-session-pool.c @@ -50,6 +50,7 @@ struct _GstRTSPSessionPoolPrivate GMutex lock; /* protects everything in this struct */ guint max_sessions; GHashTable *sessions; + guint sessions_cookie; }; #define DEFAULT_MAX_SESSIONS 0 @@ -394,6 +395,7 @@ gst_rtsp_session_pool_create (GstRTSPSessionPool * pool) g_object_ref (result); g_hash_table_insert (priv->sessions, (gchar *) gst_rtsp_session_get_sessionid (result), result); + priv->sessions_cookie++; } g_mutex_unlock (&priv->lock); @@ -455,6 +457,7 @@ gst_rtsp_session_pool_remove (GstRTSPSessionPool * pool, GstRTSPSession * sess) g_hash_table_remove (priv->sessions, gst_rtsp_session_get_sessionid (sess)); if (found) { + priv->sessions_cookie++; g_signal_emit (pool, gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, sess); } @@ -511,44 +514,13 @@ gst_rtsp_session_pool_cleanup (GstRTSPSessionPool * pool) result = g_hash_table_foreach_remove (priv->sessions, (GHRFunc) cleanup_func, &data); + if (result > 0) + priv->sessions_cookie++; g_mutex_unlock (&priv->lock); return result; } -typedef struct -{ - GstRTSPSessionPool *pool; - GstRTSPSessionPoolFilterFunc func; - gpointer user_data; - GList *list; -} FilterData; - -static gboolean -filter_func (gchar * sessionid, GstRTSPSession * sess, FilterData * data) -{ - GstRTSPFilterResult res; - - if (data->func) - res = data->func (data->pool, sess, data->user_data); - else - res = GST_RTSP_FILTER_REF; - - switch (res) { - case GST_RTSP_FILTER_REMOVE: - g_signal_emit (data->pool, - gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, sess); - return TRUE; - case GST_RTSP_FILTER_REF: - /* keep ref */ - data->list = g_list_prepend (data->list, g_object_ref (sess)); - /* fallthrough */ - default: - case GST_RTSP_FILTER_KEEP: - return FALSE; - } -} - /** * gst_rtsp_session_pool_filter: * @pool: a #GstRTSPSessionPool @@ -580,22 +552,73 @@ gst_rtsp_session_pool_filter (GstRTSPSessionPool * pool, GstRTSPSessionPoolFilterFunc func, gpointer user_data) { GstRTSPSessionPoolPrivate *priv; - FilterData data; + GHashTableIter iter; + gpointer key, value; + GList *result; + GHashTable *visited; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_SESSION_POOL (pool), NULL); priv = pool->priv; - data.pool = pool; - data.func = func; - data.user_data = user_data; - data.list = NULL; + result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); - g_hash_table_foreach_remove (priv->sessions, (GHRFunc) filter_func, &data); +restart: + g_hash_table_iter_init (&iter, priv->sessions); + cookie = priv->sessions_cookie; + while (g_hash_table_iter_next (&iter, &key, &value)) { + GstRTSPSession *session = value; + GstRTSPFilterResult res; + gboolean changed; + + if (func) { + /* only visit each session once */ + if (g_hash_table_contains (visited, session)) + continue; + + g_hash_table_add (visited, g_object_ref (session)); + g_mutex_unlock (&priv->lock); + + res = func (pool, session, user_data); + + g_mutex_lock (&priv->lock); + } else + res = GST_RTSP_FILTER_REF; + + changed = (cookie != priv->sessions_cookie); + + switch (res) { + case GST_RTSP_FILTER_REMOVE: + g_signal_emit (pool, + gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, session); + + if (changed) + g_hash_table_remove (priv->sessions, key); + else + g_hash_table_iter_remove (&iter); + cookie = ++priv->sessions_cookie; + break; + case GST_RTSP_FILTER_REF: + /* keep ref */ + result = g_list_prepend (result, g_object_ref (session)); + break; + case GST_RTSP_FILTER_KEEP: + default: + break; + } + if (changed) + goto restart; + } g_mutex_unlock (&priv->lock); - return data.list; + if (func) + g_hash_table_unref (visited); + + return result; } typedef struct diff --git a/gst/rtsp-server/rtsp-session.c b/gst/rtsp-server/rtsp-session.c index 3463b32..372746a 100644 --- a/gst/rtsp-server/rtsp-session.c +++ b/gst/rtsp-server/rtsp-session.c @@ -62,6 +62,7 @@ struct _GstRTSPSessionPrivate gint expire_count; GList *medias; + guint medias_cookie; }; #undef DEBUG @@ -238,6 +239,7 @@ gst_rtsp_session_manage_media (GstRTSPSession * sess, const gchar * path, g_mutex_lock (&priv->lock); priv->medias = g_list_prepend (priv->medias, result); + priv->medias_cookie++; g_mutex_unlock (&priv->lock); GST_INFO ("manage new media %p in session %p", media, result); @@ -269,8 +271,10 @@ gst_rtsp_session_release_media (GstRTSPSession * sess, g_mutex_lock (&priv->lock); find = g_list_find (priv->medias, media); - if (find) + if (find) { priv->medias = g_list_delete_link (priv->medias, find); + priv->medias_cookie++; + } more = (priv->medias != NULL); g_mutex_unlock (&priv->lock); @@ -359,29 +363,51 @@ gst_rtsp_session_filter (GstRTSPSession * sess, { GstRTSPSessionPrivate *priv; GList *result, *walk, *next; + GHashTable *visited; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_SESSION (sess), NULL); priv = sess->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); +restart: + cookie = priv->medias_cookie; for (walk = priv->medias; walk; walk = next) { GstRTSPSessionMedia *media = walk->data; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) + if (func) { + /* only visit each media once */ + if (g_hash_table_contains (visited, media)) + continue; + + g_hash_table_add (visited, g_object_ref (media)); + g_mutex_unlock (&priv->lock); + res = func (sess, media, user_data); - else + + g_mutex_lock (&priv->lock); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->medias_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: + if (changed) + priv->medias = g_list_remove (priv->medias, media); + else + priv->medias = g_list_delete_link (priv->medias, walk); + cookie = ++priv->medias_cookie; g_object_unref (media); - priv->medias = g_list_delete_link (priv->medias, walk); break; case GST_RTSP_FILTER_REF: result = g_list_prepend (result, g_object_ref (media)); @@ -390,9 +416,14 @@ gst_rtsp_session_filter (GstRTSPSession * sess, default: break; } + if (changed) + goto restart; } g_mutex_unlock (&priv->lock); + if (func) + g_hash_table_unref (visited); + return result; } diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 02c8274..942b837 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -124,8 +124,9 @@ struct _GstRTSPStreamPrivate /* transports we stream to */ guint n_active; GList *transports; - gboolean tr_changed; + guint transports_cookie; GList *tr_cache; + guint tr_cache_cookie; gint dscp_qos; @@ -1503,13 +1504,13 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0]; g_mutex_lock (&priv->lock); - if (priv->tr_changed) { + if (priv->tr_cache_cookie != priv->transports_cookie) { clear_tr_cache (priv); for (walk = priv->transports; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr)); } - priv->tr_changed = FALSE; + priv->tr_cache_cookie = priv->transports_cookie; } g_mutex_unlock (&priv->lock); @@ -2268,7 +2269,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL); priv->transports = g_list_remove (priv->transports, trans); } - priv->tr_changed = TRUE; + priv->transports_cookie++; break; } case GST_RTSP_LOWER_TRANS_TCP: @@ -2279,7 +2280,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, GST_INFO ("removing TCP %s", tr->destination); priv->transports = g_list_remove (priv->transports, trans); } - priv->tr_changed = TRUE; + priv->transports_cookie++; break; default: goto unknown_transport; @@ -2497,25 +2498,43 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream, { GstRTSPStreamPrivate *priv; GList *result, *walk, *next; + GHashTable *visited; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); +restart: + cookie = priv->transports_cookie; for (walk = priv->transports; walk; walk = next) { GstRTSPStreamTransport *trans = walk->data; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) + if (func) { + /* only visit each transport once */ + if (g_hash_table_contains (visited, trans)) + continue; + + g_hash_table_add (visited, g_object_ref (trans)); + g_mutex_unlock (&priv->lock); + res = func (stream, trans, user_data); - else + + g_mutex_lock (&priv->lock); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->transports_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: update_transport (stream, trans, FALSE); @@ -2527,9 +2546,14 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream, default: break; } + if (changed) + goto restart; } g_mutex_unlock (&priv->lock); + if (func) + g_hash_table_unref (visited); + return result; } -- 2.7.4