rtsp-server: Limit the number of threads per server instance
authorOlivier CrĂȘte <olivier.crete@collabora.com>
Tue, 19 Feb 2013 18:19:41 +0000 (13:19 -0500)
committerWim Taymans <wim.taymans@collabora.co.uk>
Mon, 11 Mar 2013 10:07:20 +0000 (11:07 +0100)
If we exceed the maximum, just round robin the clients over the existing
threads.

configure.ac
gst/rtsp-server/rtsp-server.c
tests/check/gst/rtspserver.c

index 2728f15..0a07f73 100644 (file)
@@ -106,7 +106,7 @@ dnl *** checks for library functions ***
 dnl *** checks for dependancy libraries ***
 
 dnl GLib is required (GStreamer is ok with GLib-2.8, but we want at least 2.10)
-GLIB_REQ=2.10.0
+GLIB_REQ=2.32.0
 AC_SUBST([GLIB_REQ])
 AG_GST_GLIB_CHECK([$GLIB_REQ])
 
index 9ab709e..ccfcf49 100644 (file)
@@ -53,6 +53,7 @@ struct _GstRTSPServerPrivate
 
   /* the clients that are connected */
   GList *clients;
+  GQueue loops;                 /* the main loops used in the threads */
 };
 
 #define DEFAULT_ADDRESS         "0.0.0.0"
@@ -93,6 +94,7 @@ GST_DEBUG_CATEGORY_STATIC (rtsp_server_debug);
 #define GST_CAT_DEFAULT rtsp_server_debug
 
 typedef struct _ClientContext ClientContext;
+typedef struct _Loop Loop;
 
 static guint gst_rtsp_server_signals[SIGNAL_LAST] = { 0 };
 
@@ -102,7 +104,7 @@ static void gst_rtsp_server_set_property (GObject * object, guint propid,
     const GValue * value, GParamSpec * pspec);
 static void gst_rtsp_server_finalize (GObject * object);
 
-static gpointer do_loop (ClientContext * ctx);
+static gpointer do_loop (Loop * loop);
 static GstRTSPClient *default_create_client (GstRTSPServer * server);
 static gboolean default_accept_client (GstRTSPServer * server,
     GstRTSPClient * client, GSocket * socket, GError ** error);
@@ -231,6 +233,7 @@ gst_rtsp_server_init (GstRTSPServer * server)
   priv->session_pool = gst_rtsp_session_pool_new ();
   priv->mount_points = gst_rtsp_mount_points_new ();
   priv->max_threads = DEFAULT_MAX_THREADS;
+  g_queue_init (&priv->loops);
 }
 
 static void
@@ -912,34 +915,95 @@ close_error:
   }
 }
 
+struct _Loop
+{
+  gint refcnt;
+
+  GstRTSPServer *server;
+  GMainLoop *mainloop;
+  GMainContext *mainctx;
+};
+
+/* must be called with the lock held */
+static void
+loop_unref (Loop * loop)
+{
+  GstRTSPServer *server = loop->server;
+  GstRTSPServerPrivate *priv = server->priv;
+
+  loop->refcnt--;
+
+  if (loop->refcnt <= 0) {
+    g_queue_remove (&priv->loops, loop);
+    g_main_loop_quit (loop->mainloop);
+  }
+}
+
 struct _ClientContext
 {
   GstRTSPServer *server;
-  GMainLoop *loop;
+  Loop *loop;
   GstRTSPClient *client;
 };
 
-static void
+static gboolean
 free_client_context (ClientContext * ctx)
 {
+  GST_RTSP_SERVER_LOCK (ctx->server);
   if (ctx->loop)
-    g_main_loop_unref (ctx->loop);
+    loop_unref (ctx->loop);
+  GST_RTSP_SERVER_UNLOCK (ctx->server);
+
   g_object_unref (ctx->client);
   g_slice_free (ClientContext, ctx);
+
+  return G_SOURCE_REMOVE;
 }
 
 static gpointer
-do_loop (ClientContext * ctx)
+do_loop (Loop * loop)
 {
   GST_INFO ("enter mainloop");
-  g_main_loop_run (ctx->loop);
+  g_main_loop_run (loop->mainloop);
   GST_INFO ("exit mainloop");
 
-  free_client_context (ctx);
+  g_main_context_unref (loop->mainctx);
+  g_main_loop_unref (loop->mainloop);
+  g_object_unref (loop->server);
+  g_slice_free (Loop, loop);
 
   return NULL;
 }
 
+/* Must be called with lock held */
+
+static Loop *
+gst_rtsp_server_get_main_loop (GstRTSPServer * server)
+{
+  GstRTSPServerPrivate *priv = server->priv;
+  Loop *loop;
+
+  if (priv->max_threads > 0 &&
+      g_queue_get_length (&priv->loops) >= priv->max_threads) {
+    loop = g_queue_pop_head (&priv->loops);
+    loop->refcnt++;
+  } else {
+    GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server);
+
+    loop = g_slice_new0 (Loop);
+    loop->refcnt = 1;
+    loop->server = g_object_ref (server);
+    loop->mainctx = g_main_context_new ();
+    loop->mainloop = g_main_loop_new (loop->mainctx, FALSE);
+
+    g_thread_pool_push (klass->pool, loop, NULL);
+  }
+
+  g_queue_push_tail (&priv->loops, loop);
+
+  return loop;
+}
+
 static void
 unmanage_client (GstRTSPClient * client, ClientContext * ctx)
 {
@@ -954,10 +1018,16 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
   priv->clients = g_list_remove (priv->clients, ctx);
   GST_RTSP_SERVER_UNLOCK (server);
 
-  if (ctx->loop)
-    g_main_loop_quit (ctx->loop);
-  else
+  if (ctx->loop) {
+    GSource *src;
+
+    src = g_idle_source_new ();
+    g_source_set_callback (src, (GSourceFunc) free_client_context, ctx, NULL);
+    g_source_attach (src, ctx->loop->mainctx);
+    g_source_unref (src);
+  } else {
     free_client_context (ctx);
+  }
 
   g_object_unref (server);
 }
@@ -976,6 +1046,8 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
   ctx = g_slice_new0 (ClientContext);
   ctx->server = server;
   ctx->client = client;
+
+  GST_RTSP_SERVER_LOCK (server);
   if (priv->max_threads == 0) {
     GSource *source;
 
@@ -985,22 +1057,16 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
     else
       mainctx = NULL;
   } else {
-    mainctx = g_main_context_new ();
-    ctx->loop = g_main_loop_new (mainctx, TRUE);
-    g_main_context_unref (mainctx);
+    ctx->loop = gst_rtsp_server_get_main_loop (server);
+    mainctx = ctx->loop->mainctx;
   }
-  gst_rtsp_client_attach (client, mainctx);
 
-  GST_RTSP_SERVER_LOCK (server);
   g_signal_connect (client, "closed", (GCallback) unmanage_client, ctx);
   priv->clients = g_list_prepend (priv->clients, ctx);
-  GST_RTSP_SERVER_UNLOCK (server);
 
-  if (ctx->loop) {
-    GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server);
+  gst_rtsp_client_attach (client, mainctx);
 
-    g_thread_pool_push (klass->pool, ctx, NULL);
-  }
+  GST_RTSP_SERVER_UNLOCK (server);
 }
 
 static GstRTSPClient *
index 524ca7f..9cd07ad 100644 (file)
@@ -742,7 +742,7 @@ GST_END_TEST;
 
 GST_START_TEST (test_play_multithreaded)
 {
-  gst_rtsp_server_set_max_threads (server, -1);
+  gst_rtsp_server_set_max_threads (server, 2);
 
   start_server ();
 
@@ -789,7 +789,7 @@ GST_START_TEST (test_play_multithreaded_block_in_describe)
   GstRTSPMessage *response;
   GstRTSPStatusCode code;
 
-  gst_rtsp_server_set_max_threads (server, 1);
+  gst_rtsp_server_set_max_threads (server, 2);
 
   mounts = gst_rtsp_server_get_mount_points (server);
   fail_unless (mounts != NULL);
@@ -873,7 +873,7 @@ GST_START_TEST (test_play_multithreaded_timeout_client)
   GstRTSPMessage *request;
   GstRTSPMessage *response;
 
-  gst_rtsp_server_set_max_threads (server, -1);
+  gst_rtsp_server_set_max_threads (server, 2);
   pool = gst_rtsp_server_get_session_pool (server);
   g_signal_connect (server, "client-connected",
       G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
@@ -952,7 +952,7 @@ GST_START_TEST (test_play_multithreaded_timeout_session)
   GstRTSPTransport *audio_transport = NULL;
   GstRTSPSessionPool *pool;
 
-  gst_rtsp_server_set_max_threads (server, -1);
+  gst_rtsp_server_set_max_threads (server, 2);
   pool = gst_rtsp_server_get_session_pool (server);
   g_signal_connect (server, "client-connected",
       G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);