server: use thread pool
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 10 Jul 2013 14:49:55 +0000 (16:49 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 10 Jul 2013 15:02:58 +0000 (17:02 +0200)
Use the thread pool instead of doing our own thing.

gst/rtsp-server/rtsp-client.h
gst/rtsp-server/rtsp-server.c
gst/rtsp-server/rtsp-server.h

index a7311c2..5b6963f 100644 (file)
@@ -30,6 +30,7 @@ typedef struct _GstRTSPClientClass GstRTSPClientClass;
 typedef struct _GstRTSPClientState GstRTSPClientState;
 typedef struct _GstRTSPClientPrivate GstRTSPClientPrivate;
 
+#include "rtsp-server.h"
 #include "rtsp-media.h"
 #include "rtsp-mount-points.h"
 #include "rtsp-session-pool.h"
@@ -49,6 +50,7 @@ typedef struct _GstRTSPClientPrivate GstRTSPClientPrivate;
 
 /**
  * GstRTSPClientState:
+ * @server: the server
  * @client: the client
  * @request: the complete request
  * @uri: the complete url parsed from @request
@@ -65,6 +67,7 @@ typedef struct _GstRTSPClientPrivate GstRTSPClientPrivate;
  * Information passed around containing the client state of a request.
  */
 struct _GstRTSPClientState {
+  GstRTSPServer       *server;
   GstRTSPClient       *client;
   GstRTSPMessage      *request;
   GstRTSPUrl          *uri;
index f056d7d..981df7d 100644 (file)
@@ -38,7 +38,6 @@ struct _GstRTSPServerPrivate
   gchar *address;
   gchar *service;
   gint backlog;
-  gint max_threads;
   gboolean use_client_settings;
 
   GSocket *socket;
@@ -52,12 +51,14 @@ struct _GstRTSPServerPrivate
   /* authentication manager */
   GstRTSPAuth *auth;
 
+  /* resource manager */
+  GstRTSPThreadPool *thread_pool;
+
   /* the TLS certificate */
   GTlsCertificate *certificate;
 
   /* the clients that are connected */
   GList *clients;
-  GQueue loops;                 /* the main loops used in the threads */
 };
 
 #define DEFAULT_ADDRESS         "0.0.0.0"
@@ -65,7 +66,6 @@ struct _GstRTSPServerPrivate
 /* #define DEFAULT_ADDRESS         "::0" */
 #define DEFAULT_SERVICE         "8554"
 #define DEFAULT_BACKLOG         5
-#define DEFAULT_MAX_THREADS     0
 #define DEFAULT_USE_CLIENT_SETTINGS     FALSE
 
 /* Define to use the SO_LINGER option so that the server sockets can be resused
@@ -83,7 +83,6 @@ enum
 
   PROP_SESSION_POOL,
   PROP_MOUNT_POINTS,
-  PROP_MAX_THREADS,
   PROP_USE_CLIENT_SETTINGS,
   PROP_LAST
 };
@@ -110,7 +109,6 @@ static void gst_rtsp_server_set_property (GObject * object, guint propid,
     const GValue * value, GParamSpec * pspec);
 static void gst_rtsp_server_finalize (GObject * object);
 
-static gpointer do_loop (Loop * loop);
 static GstRTSPClient *default_create_client (GstRTSPServer * server);
 static gboolean default_setup_connection (GstRTSPServer * server,
     GstRTSPClient * client, GstRTSPConnection * conn);
@@ -198,18 +196,6 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass)
           GST_TYPE_RTSP_MOUNT_POINTS,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
-   * GstRTSPServer::max-threads:
-   *
-   * The maximum amount of threads to use for client connections. A value of
-   * 0 means to use only the mainloop, -1 means an unlimited amount of
-   * threads.
-   */
-  g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
-      g_param_spec_int ("max-threads", "Max Threads",
-          "The maximum amount of threads to use for client connections "
-          "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT,
-          DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-  /**
    * GstRTSPServer::use-client-settings:
    *
    * Use client transport settings (destination, port pair and ttl for
@@ -230,8 +216,6 @@ gst_rtsp_server_class_init (GstRTSPServerClass * klass)
   klass->create_client = default_create_client;
   klass->setup_connection = default_setup_connection;
 
-  klass->pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL);
-
   GST_DEBUG_CATEGORY_INIT (rtsp_server_debug, "rtspserver", 0, "GstRTSPServer");
 }
 
@@ -249,9 +233,8 @@ gst_rtsp_server_init (GstRTSPServer * server)
   priv->backlog = DEFAULT_BACKLOG;
   priv->session_pool = gst_rtsp_session_pool_new ();
   priv->mount_points = gst_rtsp_mount_points_new ();
-  priv->max_threads = DEFAULT_MAX_THREADS;
+  priv->thread_pool = gst_rtsp_thread_pool_new ();
   priv->use_client_settings = DEFAULT_USE_CLIENT_SETTINGS;
-  g_queue_init (&priv->loops);
 }
 
 static void
@@ -268,8 +251,12 @@ gst_rtsp_server_finalize (GObject * object)
   if (priv->socket)
     g_object_unref (priv->socket);
 
-  g_object_unref (priv->session_pool);
-  g_object_unref (priv->mount_points);
+  if (priv->session_pool)
+    g_object_unref (priv->session_pool);
+  if (priv->mount_points)
+    g_object_unref (priv->mount_points);
+  if (priv->thread_pool)
+    g_object_unref (priv->thread_pool);
 
   if (priv->auth)
     g_object_unref (priv->auth);
@@ -654,52 +641,60 @@ gst_rtsp_server_get_auth (GstRTSPServer * server)
 }
 
 /**
- * gst_rtsp_server_set_max_threads:
+ * gst_rtsp_server_set_thread_pool:
  * @server: a #GstRTSPServer
- * @max_threads: maximum threads
+ * @pool: a #GstRTSPThreadPool
  *
- * Set the maximum threads used by the server to handle client requests.
- * A value of 0 will use the server mainloop, a value of -1 will use an
- * unlimited number of threads.
+ * configure @pool to be used as the thread pool of @server.
  */
 void
-gst_rtsp_server_set_max_threads (GstRTSPServer * server, gint max_threads)
+gst_rtsp_server_set_thread_pool (GstRTSPServer * server,
+    GstRTSPThreadPool * pool)
 {
   GstRTSPServerPrivate *priv;
+  GstRTSPThreadPool *old;
 
   g_return_if_fail (GST_IS_RTSP_SERVER (server));
 
   priv = server->priv;
 
+  if (pool)
+    g_object_ref (pool);
+
   GST_RTSP_SERVER_LOCK (server);
-  priv->max_threads = max_threads;
+  old = priv->thread_pool;
+  priv->thread_pool = pool;
   GST_RTSP_SERVER_UNLOCK (server);
+
+  if (old)
+    g_object_unref (old);
 }
 
 /**
- * gst_rtsp_server_get_max_threads:
+ * gst_rtsp_server_get_thread_pool:
  * @server: a #GstRTSPServer
  *
- * Get the maximum number of threads used for client connections.
- * See gst_rtsp_server_set_max_threads().
+ * Get the #GstRTSPThreadPool used as the thread pool of @server.
  *
- * Returns: the maximum number of threads.
+ * Returns: (transfer full): the #GstRTSPThreadPool of @server. g_object_unref() after
+ * usage.
  */
-gint
-gst_rtsp_server_get_max_threads (GstRTSPServer * server)
+GstRTSPThreadPool *
+gst_rtsp_server_get_thread_pool (GstRTSPServer * server)
 {
   GstRTSPServerPrivate *priv;
-  gint res;
+  GstRTSPThreadPool *result;
 
-  g_return_val_if_fail (GST_IS_RTSP_SERVER (server), -1);
+  g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
 
   priv = server->priv;
 
   GST_RTSP_SERVER_LOCK (server);
-  res = priv->max_threads;
+  if ((result = priv->thread_pool))
+    g_object_ref (result);
   GST_RTSP_SERVER_UNLOCK (server);
 
-  return res;
+  return result;
 }
 
 /**
@@ -834,9 +829,6 @@ gst_rtsp_server_get_property (GObject * object, guint propid,
     case PROP_MOUNT_POINTS:
       g_value_take_object (value, gst_rtsp_server_get_mount_points (server));
       break;
-    case PROP_MAX_THREADS:
-      g_value_set_int (value, gst_rtsp_server_get_max_threads (server));
-      break;
     case PROP_USE_CLIENT_SETTINGS:
       g_value_set_boolean (value,
           gst_rtsp_server_get_use_client_settings (server));
@@ -868,9 +860,6 @@ gst_rtsp_server_set_property (GObject * object, guint propid,
     case PROP_MOUNT_POINTS:
       gst_rtsp_server_set_mount_points (server, g_value_get_object (value));
       break;
-    case PROP_MAX_THREADS:
-      gst_rtsp_server_set_max_threads (server, g_value_get_int (value));
-      break;
     case PROP_USE_CLIENT_SETTINGS:
       gst_rtsp_server_set_use_client_settings (server,
           g_value_get_boolean (value));
@@ -1050,34 +1039,10 @@ close_error:
   }
 }
 
-struct _Loop
-{
-  gint refcnt;
-
-  GstRTSPServer *server;
-  GMainLoop *mainloop;
-  GMainContext *mainctx;
-};
-
-/* must be called with the lock held */
-static void
-loop_unref (Loop * loop)
-{
-  GstRTSPServer *server = loop->server;
-  GstRTSPServerPrivate *priv = server->priv;
-
-  loop->refcnt--;
-
-  if (loop->refcnt <= 0) {
-    g_queue_remove (&priv->loops, loop);
-    g_main_loop_quit (loop->mainloop);
-  }
-}
-
 struct _ClientContext
 {
   GstRTSPServer *server;
-  Loop *loop;
+  GstRTSPThread *thread;
   GstRTSPClient *client;
 };
 
@@ -1085,8 +1050,8 @@ static gboolean
 free_client_context (ClientContext * ctx)
 {
   GST_RTSP_SERVER_LOCK (ctx->server);
-  if (ctx->loop)
-    loop_unref (ctx->loop);
+  if (ctx->thread)
+    gst_rtsp_thread_stop (ctx->thread);
   GST_RTSP_SERVER_UNLOCK (ctx->server);
 
   g_object_unref (ctx->client);
@@ -1095,50 +1060,6 @@ free_client_context (ClientContext * ctx)
   return G_SOURCE_REMOVE;
 }
 
-static gpointer
-do_loop (Loop * loop)
-{
-  GST_INFO ("enter mainloop");
-  g_main_loop_run (loop->mainloop);
-  GST_INFO ("exit mainloop");
-
-  g_main_context_unref (loop->mainctx);
-  g_main_loop_unref (loop->mainloop);
-  g_object_unref (loop->server);
-  g_slice_free (Loop, loop);
-
-  return NULL;
-}
-
-/* Must be called with lock held */
-
-static Loop *
-gst_rtsp_server_get_main_loop (GstRTSPServer * server)
-{
-  GstRTSPServerPrivate *priv = server->priv;
-  Loop *loop;
-
-  if (priv->max_threads > 0 &&
-      g_queue_get_length (&priv->loops) >= priv->max_threads) {
-    loop = g_queue_pop_head (&priv->loops);
-    loop->refcnt++;
-  } else {
-    GstRTSPServerClass *klass = GST_RTSP_SERVER_GET_CLASS (server);
-
-    loop = g_slice_new0 (Loop);
-    loop->refcnt = 1;
-    loop->server = g_object_ref (server);
-    loop->mainctx = g_main_context_new ();
-    loop->mainloop = g_main_loop_new (loop->mainctx, FALSE);
-
-    g_thread_pool_push (klass->pool, loop, NULL);
-  }
-
-  g_queue_push_tail (&priv->loops, loop);
-
-  return loop;
-}
-
 static void
 unmanage_client (GstRTSPClient * client, ClientContext * ctx)
 {
@@ -1153,12 +1074,12 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
   priv->clients = g_list_remove (priv->clients, ctx);
   GST_RTSP_SERVER_UNLOCK (server);
 
-  if (ctx->loop) {
+  if (ctx->thread) {
     GSource *src;
 
     src = g_idle_source_new ();
     g_source_set_callback (src, (GSourceFunc) free_client_context, ctx, NULL);
-    g_source_attach (src, ctx->loop->mainctx);
+    g_source_attach (src, ctx->thread->context);
     g_source_unref (src);
   } else {
     free_client_context (ctx);
@@ -1174,7 +1095,8 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
 {
   ClientContext *ctx;
   GstRTSPServerPrivate *priv = server->priv;
-  GMainContext *mainctx;
+  GMainContext *mainctx = NULL;
+  GstRTSPClientState state = { NULL };
 
   GST_DEBUG_OBJECT (server, "manage client %p", client);
 
@@ -1183,17 +1105,19 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
   ctx->client = client;
 
   GST_RTSP_SERVER_LOCK (server);
-  if (priv->max_threads == 0) {
-    GSource *source;
 
+  state.server = server;
+  state.client = client;
+
+  ctx->thread = gst_rtsp_thread_pool_get_thread (priv->thread_pool,
+      GST_RTSP_THREAD_TYPE_CLIENT, &state);
+  if (ctx->thread)
+    mainctx = ctx->thread->context;
+  else {
+    GSource *source;
     /* find the context to add the watch */
     if ((source = g_main_current_source ()))
       mainctx = g_source_get_context (source);
-    else
-      mainctx = NULL;
-  } else {
-    ctx->loop = gst_rtsp_server_get_main_loop (server);
-    mainctx = ctx->loop->mainctx;
   }
 
   g_signal_connect (client, "closed", (GCallback) unmanage_client, ctx);
index a71c27f..b4e8393 100644 (file)
@@ -100,8 +100,8 @@ GstRTSPMountPoints *  gst_rtsp_server_get_mount_points     (GstRTSPServer *serve
 void                  gst_rtsp_server_set_auth             (GstRTSPServer *server, GstRTSPAuth *auth);
 GstRTSPAuth *         gst_rtsp_server_get_auth             (GstRTSPServer *server);
 
-void                  gst_rtsp_server_set_max_threads      (GstRTSPServer *server, gint max_threads);
-gint                  gst_rtsp_server_get_max_threads      (GstRTSPServer *server);
+void                  gst_rtsp_server_set_thread_pool      (GstRTSPServer *server, GstRTSPThreadPool *pool);
+GstRTSPThreadPool *   gst_rtsp_server_get_thread_pool      (GstRTSPServer *server);
 
 void                  gst_rtsp_server_set_use_client_settings (GstRTSPServer *server,
                                                                gboolean use_client_settings);