2 * Copyright (C) 2013 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
20 * SECTION:rtsp-thread-pool
21 * @short_description: A pool of threads
22 * @see_also: #GstRTSPMedia, #GstRTSPClient
24 * A #GstRTSPThreadPool manages reusable threads for various server tasks.
25 * Currently the defined thread types can be found in #GstRTSPThreadType.
27 * Threads of type #GST_RTSP_THREAD_TYPE_CLIENT are used to handle requests from
28 * a connected client. With gst_rtsp_thread_pool_get_max_threads() a maximum
29 * number of threads can be set after which the pool will start to reuse the
30 * same thread for multiple clients.
32 * Threads of type #GST_RTSP_THREAD_TYPE_MEDIA will be used to perform the state
33 * changes of the media pipelines and handle its bus messages.
35 * gst_rtsp_thread_pool_get_thread() can be used to create a #GstRTSPThread
36 * object of the right type. The thread object contains a mainloop and context
37 * that run in a seperate thread and can be used to attached sources to.
39 * gst_rtsp_thread_reuse() can be used to reuse a thread for multiple purposes.
40 * If all gst_rtsp_thread_reuse() calls are matched with a
41 * gst_rtsp_thread_stop() call, the mainloop will be quit and the thread will
44 * To configure the threads, a subclass of this object should be made and the
45 * virtual methods should be overriden to implement the desired functionality.
47 * Last reviewed on 2013-07-11 (1.0.0)
52 #include "rtsp-thread-pool.h"
54 typedef struct _GstRTSPThreadImpl
62 GST_DEFINE_MINI_OBJECT_TYPE (GstRTSPThread, gst_rtsp_thread);
64 static void gst_rtsp_thread_init (GstRTSPThreadImpl * impl);
67 _gst_rtsp_thread_free (GstRTSPThreadImpl * impl)
69 GST_DEBUG ("free thread %p", impl);
71 g_source_unref (impl->source);
72 g_main_loop_unref (impl->thread.loop);
73 g_main_context_unref (impl->thread.context);
74 g_slice_free1 (sizeof (GstRTSPThreadImpl), impl);
77 static GstRTSPThread *
78 _gst_rtsp_thread_copy (GstRTSPThreadImpl * impl)
80 GstRTSPThreadImpl *copy;
82 GST_DEBUG ("copy thread %p", impl);
84 copy = g_slice_new0 (GstRTSPThreadImpl);
85 gst_rtsp_thread_init (copy);
86 copy->thread.context = g_main_context_ref (impl->thread.context);
87 copy->thread.loop = g_main_loop_ref (impl->thread.loop);
89 return GST_RTSP_THREAD (copy);
93 gst_rtsp_thread_init (GstRTSPThreadImpl * impl)
95 gst_mini_object_init (GST_MINI_OBJECT_CAST (impl), 0,
97 (GstMiniObjectCopyFunction) _gst_rtsp_thread_copy, NULL,
98 (GstMiniObjectFreeFunction) _gst_rtsp_thread_free);
100 g_atomic_int_set (&impl->reused, 1);
104 * gst_rtsp_thread_new:
105 * @type: the thread type
107 * Create a new thread object that can run a mainloop.
109 * Returns: (transfer full): a #GstRTSPThread.
112 gst_rtsp_thread_new (GstRTSPThreadType type)
114 GstRTSPThreadImpl *impl;
116 impl = g_slice_new0 (GstRTSPThreadImpl);
118 gst_rtsp_thread_init (impl);
119 impl->thread.type = type;
120 impl->thread.context = g_main_context_new ();
121 impl->thread.loop = g_main_loop_new (impl->thread.context, TRUE);
123 return GST_RTSP_THREAD (impl);
127 * gst_rtsp_thread_reuse:
128 * @thread: (transfer none): a #GstRTSPThread
130 * Reuse the mainloop of @thread
132 * Returns: %TRUE if the mainloop could be reused
135 gst_rtsp_thread_reuse (GstRTSPThread * thread)
137 GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
140 g_return_val_if_fail (GST_IS_RTSP_THREAD (thread), FALSE);
142 GST_DEBUG ("reuse thread %p", thread);
144 res = g_atomic_int_add (&impl->reused, 1) > 0;
146 gst_rtsp_thread_ref (thread);
152 do_quit (GstRTSPThread * thread)
154 GST_DEBUG ("stop mainloop of thread %p", thread);
155 g_main_loop_quit (thread->loop);
160 * gst_rtsp_thread_stop:
161 * @thread: (transfer full): a #GstRTSPThread
163 * Stop and unref @thread. When no threads are using the mainloop, the thread
164 * will be stopped and the final ref to @thread will be released.
167 gst_rtsp_thread_stop (GstRTSPThread * thread)
169 GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
171 g_return_if_fail (GST_IS_RTSP_THREAD (thread));
173 GST_DEBUG ("stop thread %p", thread);
175 if (g_atomic_int_dec_and_test (&impl->reused)) {
176 GST_DEBUG ("add idle source to quit mainloop of thread %p", thread);
177 impl->source = g_idle_source_new ();
178 g_source_set_callback (impl->source, (GSourceFunc) do_quit,
179 thread, (GDestroyNotify) gst_rtsp_thread_unref);
180 g_source_attach (impl->source, thread->context);
182 gst_rtsp_thread_unref (thread);
185 #define GST_RTSP_THREAD_POOL_GET_PRIVATE(obj) \
186 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_THREAD_POOL, GstRTSPThreadPoolPrivate))
188 struct _GstRTSPThreadPoolPrivate
193 /* currently used mainloops */
197 #define DEFAULT_MAX_THREADS 1
206 GST_DEBUG_CATEGORY_STATIC (rtsp_thread_pool_debug);
207 #define GST_CAT_DEFAULT rtsp_thread_pool_debug
209 static GQuark thread_pool;
211 static void gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
212 GValue * value, GParamSpec * pspec);
213 static void gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
214 const GValue * value, GParamSpec * pspec);
215 static void gst_rtsp_thread_pool_finalize (GObject * obj);
217 static gpointer do_loop (GstRTSPThread * thread);
218 static GstRTSPThread *default_get_thread (GstRTSPThreadPool * pool,
219 GstRTSPThreadType type, GstRTSPContext * ctx);
221 G_DEFINE_TYPE (GstRTSPThreadPool, gst_rtsp_thread_pool, G_TYPE_OBJECT);
224 gst_rtsp_thread_pool_class_init (GstRTSPThreadPoolClass * klass)
226 GObjectClass *gobject_class;
228 g_type_class_add_private (klass, sizeof (GstRTSPThreadPoolPrivate));
230 gobject_class = G_OBJECT_CLASS (klass);
232 gobject_class->get_property = gst_rtsp_thread_pool_get_property;
233 gobject_class->set_property = gst_rtsp_thread_pool_set_property;
234 gobject_class->finalize = gst_rtsp_thread_pool_finalize;
237 * GstRTSPThreadPool::max-threads:
239 * The maximum amount of threads to use for client connections. A value of
240 * 0 means to use only the mainloop, -1 means an unlimited amount of
243 g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
244 g_param_spec_int ("max-threads", "Max Threads",
245 "The maximum amount of threads to use for client connections "
246 "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT,
247 DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249 klass->get_thread = default_get_thread;
251 GST_DEBUG_CATEGORY_INIT (rtsp_thread_pool_debug, "rtspthreadpool", 0,
252 "GstRTSPThreadPool");
254 thread_pool = g_quark_from_string ("gst.rtsp.thread.pool");
258 gst_rtsp_thread_pool_init (GstRTSPThreadPool * pool)
260 GstRTSPThreadPoolPrivate *priv;
262 pool->priv = priv = GST_RTSP_THREAD_POOL_GET_PRIVATE (pool);
264 g_mutex_init (&priv->lock);
265 priv->max_threads = DEFAULT_MAX_THREADS;
266 g_queue_init (&priv->threads);
270 gst_rtsp_thread_pool_finalize (GObject * obj)
272 GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (obj);
273 GstRTSPThreadPoolPrivate *priv = pool->priv;
275 GST_INFO ("finalize pool %p", pool);
277 g_queue_clear (&priv->threads);
278 g_mutex_clear (&priv->lock);
280 G_OBJECT_CLASS (gst_rtsp_thread_pool_parent_class)->finalize (obj);
284 gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
285 GValue * value, GParamSpec * pspec)
287 GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);
290 case PROP_MAX_THREADS:
291 g_value_set_int (value, gst_rtsp_thread_pool_get_max_threads (pool));
294 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
299 gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
300 const GValue * value, GParamSpec * pspec)
302 GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);
305 case PROP_MAX_THREADS:
306 gst_rtsp_thread_pool_set_max_threads (pool, g_value_get_int (value));
309 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
314 do_loop (GstRTSPThread * thread)
316 GstRTSPThreadPoolPrivate *priv;
317 GstRTSPThreadPoolClass *klass;
318 GstRTSPThreadPool *pool;
320 pool = gst_mini_object_get_qdata (GST_MINI_OBJECT (thread), thread_pool);
323 klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
325 if (klass->thread_enter)
326 klass->thread_enter (pool, thread);
328 GST_INFO ("enter mainloop of thread %p", thread);
329 g_main_loop_run (thread->loop);
330 GST_INFO ("exit mainloop of thread %p", thread);
332 if (klass->thread_leave)
333 klass->thread_leave (pool, thread);
335 g_mutex_lock (&priv->lock);
336 g_queue_remove (&priv->threads, thread);
337 g_mutex_unlock (&priv->lock);
339 gst_rtsp_thread_unref (thread);
345 * gst_rtsp_thread_pool_new:
347 * Create a new #GstRTSPThreadPool instance.
349 * Returns: (transfer full): a new #GstRTSPThreadPool
352 gst_rtsp_thread_pool_new (void)
354 GstRTSPThreadPool *result;
356 result = g_object_new (GST_TYPE_RTSP_THREAD_POOL, NULL);
362 * gst_rtsp_thread_pool_set_max_threads:
363 * @pool: a #GstRTSPThreadPool
364 * @max_threads: maximum threads
366 * Set the maximum threads used by the pool to handle client requests.
367 * A value of 0 will use the pool mainloop, a value of -1 will use an
368 * unlimited number of threads.
371 gst_rtsp_thread_pool_set_max_threads (GstRTSPThreadPool * pool,
374 GstRTSPThreadPoolPrivate *priv;
376 g_return_if_fail (GST_IS_RTSP_THREAD_POOL (pool));
380 g_mutex_lock (&priv->lock);
381 priv->max_threads = max_threads;
382 g_mutex_unlock (&priv->lock);
386 * gst_rtsp_thread_pool_get_max_threads:
387 * @pool: a #GstRTSPThreadPool
389 * Get the maximum number of threads used for client connections.
390 * See gst_rtsp_thread_pool_set_max_threads().
392 * Returns: the maximum number of threads.
395 gst_rtsp_thread_pool_get_max_threads (GstRTSPThreadPool * pool)
397 GstRTSPThreadPoolPrivate *priv;
400 g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), -1);
404 g_mutex_lock (&priv->lock);
405 res = priv->max_threads;
406 g_mutex_unlock (&priv->lock);
411 static GstRTSPThread *
412 make_thread (GstRTSPThreadPool * pool, GstRTSPThreadType type,
413 GstRTSPContext * ctx)
415 GstRTSPThreadPoolClass *klass;
416 GstRTSPThread *thread;
418 klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
420 thread = gst_rtsp_thread_new (type);
421 gst_mini_object_set_qdata (GST_MINI_OBJECT (thread), thread_pool,
422 g_object_ref (pool), g_object_unref);
424 GST_DEBUG_OBJECT (pool, "new thread %p", thread);
426 if (klass->configure_thread)
427 klass->configure_thread (pool, thread, ctx);
432 static GstRTSPThread *
433 default_get_thread (GstRTSPThreadPool * pool,
434 GstRTSPThreadType type, GstRTSPContext * ctx)
436 GstRTSPThreadPoolPrivate *priv = pool->priv;
437 GstRTSPThreadPoolClass *klass;
438 GstRTSPThread *thread;
439 GError *error = NULL;
441 klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
444 case GST_RTSP_THREAD_TYPE_CLIENT:
445 if (priv->max_threads == 0) {
446 /* no threads allowed */
447 GST_DEBUG_OBJECT (pool, "no client threads allowed");
450 g_mutex_lock (&priv->lock);
452 if (priv->max_threads > 0 &&
453 g_queue_get_length (&priv->threads) >= priv->max_threads) {
454 /* max threads reached, recycle from queue */
455 thread = g_queue_pop_head (&priv->threads);
456 GST_DEBUG_OBJECT (pool, "recycle client thread %p", thread);
457 if (!gst_rtsp_thread_reuse (thread)) {
458 GST_DEBUG_OBJECT (pool, "thread %p stopping, retry", thread);
459 /* this can happen if we just decremented the reuse counter of the
460 * thread and signaled the mainloop that it should stop. We leave
461 * the thread out of the queue now, there is no point to add it
462 * again, it will be removed from the mainloop otherwise after it
467 /* make more threads */
468 GST_DEBUG_OBJECT (pool, "make new client thread");
469 thread = make_thread (pool, type, ctx);
471 if (!g_thread_pool_push (klass->pool, gst_rtsp_thread_ref (thread),
475 g_queue_push_tail (&priv->threads, thread);
476 g_mutex_unlock (&priv->lock);
479 case GST_RTSP_THREAD_TYPE_MEDIA:
480 GST_DEBUG_OBJECT (pool, "make new media thread");
481 thread = make_thread (pool, type, ctx);
483 if (!g_thread_pool_push (klass->pool, gst_rtsp_thread_ref (thread),
496 GST_ERROR_OBJECT (pool, "failed to push thread %s", error->message);
497 gst_rtsp_thread_unref (thread);
498 /* drop also the ref dedicated for the pool */
499 gst_rtsp_thread_unref (thread);
500 g_clear_error (&error);
506 * gst_rtsp_thread_pool_get_thread:
507 * @pool: a #GstRTSPThreadPool
508 * @type: the #GstRTSPThreadType
509 * @ctx: (transfer none): a #GstRTSPContext
511 * Get a new #GstRTSPThread for @type and @ctx.
513 * Returns: (transfer full): a new #GstRTSPThread, gst_rtsp_thread_stop() after usage
516 gst_rtsp_thread_pool_get_thread (GstRTSPThreadPool * pool,
517 GstRTSPThreadType type, GstRTSPContext * ctx)
519 GstRTSPThreadPoolClass *klass;
520 GstRTSPThread *result = NULL;
522 g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), NULL);
524 klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
526 /* We want to be thread safe as there might be 2 threads wanting to get new
527 * #GstRTSPThread at the same time
529 if (G_UNLIKELY (!g_atomic_pointer_get (&klass->pool))) {
531 t_pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL);
532 if (!g_atomic_pointer_compare_and_exchange (&klass->pool, NULL, t_pool))
533 g_thread_pool_free (t_pool, FALSE, TRUE);
536 if (klass->get_thread)
537 result = klass->get_thread (pool, type, ctx);
543 * gst_rtsp_thread_pool_cleanup:
545 * Wait for all tasks to be stopped and free all allocated resources. This is
546 * mainly used in test suites to ensure proper cleanup of internal data
550 gst_rtsp_thread_pool_cleanup (void)
552 GstRTSPThreadPoolClass *klass;
555 GST_RTSP_THREAD_POOL_CLASS (g_type_class_peek
556 (gst_rtsp_thread_pool_get_type ()));
557 if (klass->pool != NULL) {
558 g_thread_pool_free (klass->pool, FALSE, TRUE);