rtsp-thread-pool.c: fix clang 10 warning
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-thread-pool.c
1 /* GStreamer
2  * Copyright (C) 2013 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-thread-pool
21  * @short_description: A pool of threads
22  * @see_also: #GstRTSPMedia, #GstRTSPClient
23  *
24  * A #GstRTSPThreadPool manages reusable threads for various server tasks.
25  * Currently the defined thread types can be found in #GstRTSPThreadType.
26  *
27  * Threads of type #GST_RTSP_THREAD_TYPE_CLIENT are used to handle requests from
28  * a connected client. With gst_rtsp_thread_pool_get_max_threads() a maximum
29  * number of threads can be set after which the pool will start to reuse the
30  * same thread for multiple clients.
31  *
32  * Threads of type #GST_RTSP_THREAD_TYPE_MEDIA will be used to perform the state
33  * changes of the media pipelines and handle its bus messages.
34  *
35  * gst_rtsp_thread_pool_get_thread() can be used to create a #GstRTSPThread
36  * object of the right type. The thread object contains a mainloop and context
37  * that run in a seperate thread and can be used to attached sources to.
38  *
39  * gst_rtsp_thread_reuse() can be used to reuse a thread for multiple purposes.
40  * If all gst_rtsp_thread_reuse() calls are matched with a
41  * gst_rtsp_thread_stop() call, the mainloop will be quit and the thread will
42  * stop.
43  *
44  * To configure the threads, a subclass of this object should be made and the
45  * virtual methods should be overriden to implement the desired functionality.
46  *
47  * Last reviewed on 2013-07-11 (1.0.0)
48  */
49 #ifdef HAVE_CONFIG_H
50 #include "config.h"
51 #endif
52
53 #include <string.h>
54
55 #include "rtsp-thread-pool.h"
56
57 typedef struct _GstRTSPThreadImpl
58 {
59   GstRTSPThread thread;
60
61   gint reused;
62   GSource *source;
63   /* FIXME, the source has to be part of GstRTSPThreadImpl, due to a bug in GLib:
64    * https://bugzilla.gnome.org/show_bug.cgi?id=720186 */
65 } GstRTSPThreadImpl;
66
67 GST_DEFINE_MINI_OBJECT_TYPE (GstRTSPThread, gst_rtsp_thread);
68
69 static void gst_rtsp_thread_init (GstRTSPThreadImpl * impl);
70
71 static void
72 _gst_rtsp_thread_free (GstRTSPThreadImpl * impl)
73 {
74   GST_DEBUG ("free thread %p", impl);
75
76   g_source_unref (impl->source);
77   g_main_loop_unref (impl->thread.loop);
78   g_main_context_unref (impl->thread.context);
79   g_slice_free1 (sizeof (GstRTSPThreadImpl), impl);
80 }
81
82 static GstRTSPThread *
83 _gst_rtsp_thread_copy (GstRTSPThreadImpl * impl)
84 {
85   GstRTSPThreadImpl *copy;
86
87   GST_DEBUG ("copy thread %p", impl);
88
89   copy = g_slice_new0 (GstRTSPThreadImpl);
90   gst_rtsp_thread_init (copy);
91   copy->thread.context = g_main_context_ref (impl->thread.context);
92   copy->thread.loop = g_main_loop_ref (impl->thread.loop);
93
94   return GST_RTSP_THREAD (copy);
95 }
96
97 static void
98 gst_rtsp_thread_init (GstRTSPThreadImpl * impl)
99 {
100   gst_mini_object_init (GST_MINI_OBJECT_CAST (impl), 0,
101       GST_TYPE_RTSP_THREAD,
102       (GstMiniObjectCopyFunction) _gst_rtsp_thread_copy, NULL,
103       (GstMiniObjectFreeFunction) _gst_rtsp_thread_free);
104
105   g_atomic_int_set (&impl->reused, 1);
106 }
107
108 /**
109  * gst_rtsp_thread_new:
110  * @type: the thread type
111  *
112  * Create a new thread object that can run a mainloop.
113  *
114  * Returns: (transfer full): a #GstRTSPThread.
115  */
116 GstRTSPThread *
117 gst_rtsp_thread_new (GstRTSPThreadType type)
118 {
119   GstRTSPThreadImpl *impl;
120
121   impl = g_slice_new0 (GstRTSPThreadImpl);
122
123   gst_rtsp_thread_init (impl);
124   impl->thread.type = type;
125   impl->thread.context = g_main_context_new ();
126   impl->thread.loop = g_main_loop_new (impl->thread.context, TRUE);
127
128   return GST_RTSP_THREAD (impl);
129 }
130
131 /**
132  * gst_rtsp_thread_reuse:
133  * @thread: (transfer none): a #GstRTSPThread
134  *
135  * Reuse the mainloop of @thread
136  *
137  * Returns: %TRUE if the mainloop could be reused
138  */
139 gboolean
140 gst_rtsp_thread_reuse (GstRTSPThread * thread)
141 {
142   GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
143   gboolean res;
144
145   g_return_val_if_fail (GST_IS_RTSP_THREAD (thread), FALSE);
146
147   GST_DEBUG ("reuse thread %p", thread);
148
149   res = g_atomic_int_add (&impl->reused, 1) > 0;
150   if (res)
151     gst_rtsp_thread_ref (thread);
152
153   return res;
154 }
155
156 static gboolean
157 do_quit (GstRTSPThread * thread)
158 {
159   GST_DEBUG ("stop mainloop of thread %p", thread);
160   g_main_loop_quit (thread->loop);
161   return FALSE;
162 }
163
164 /**
165  * gst_rtsp_thread_stop:
166  * @thread: (transfer full): a #GstRTSPThread
167  *
168  * Stop and unref @thread. When no threads are using the mainloop, the thread
169  * will be stopped and the final ref to @thread will be released.
170  */
171 void
172 gst_rtsp_thread_stop (GstRTSPThread * thread)
173 {
174   GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
175
176   g_return_if_fail (GST_IS_RTSP_THREAD (thread));
177
178   GST_DEBUG ("stop thread %p", thread);
179
180   if (g_atomic_int_dec_and_test (&impl->reused)) {
181     GST_DEBUG ("add idle source to quit mainloop of thread %p", thread);
182     impl->source = g_idle_source_new ();
183     g_source_set_callback (impl->source, (GSourceFunc) do_quit,
184         thread, (GDestroyNotify) gst_rtsp_thread_unref);
185     g_source_attach (impl->source, thread->context);
186   } else
187     gst_rtsp_thread_unref (thread);
188 }
189
190 struct _GstRTSPThreadPoolPrivate
191 {
192   GMutex lock;
193
194   gint max_threads;
195   /* currently used mainloops */
196   GQueue threads;
197 };
198
199 #define DEFAULT_MAX_THREADS 1
200
201 enum
202 {
203   PROP_0,
204   PROP_MAX_THREADS,
205   PROP_LAST
206 };
207
208 GST_DEBUG_CATEGORY_STATIC (rtsp_thread_pool_debug);
209 #define GST_CAT_DEFAULT rtsp_thread_pool_debug
210
211 static GQuark thread_pool;
212
213 static void gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
214     GValue * value, GParamSpec * pspec);
215 static void gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
216     const GValue * value, GParamSpec * pspec);
217 static void gst_rtsp_thread_pool_finalize (GObject * obj);
218
219 static gpointer do_loop (GstRTSPThread * thread);
220 static GstRTSPThread *default_get_thread (GstRTSPThreadPool * pool,
221     GstRTSPThreadType type, GstRTSPContext * ctx);
222
223 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPThreadPool, gst_rtsp_thread_pool,
224     G_TYPE_OBJECT);
225
226 static void
227 gst_rtsp_thread_pool_class_init (GstRTSPThreadPoolClass * klass)
228 {
229   GObjectClass *gobject_class;
230
231   gobject_class = G_OBJECT_CLASS (klass);
232
233   gobject_class->get_property = gst_rtsp_thread_pool_get_property;
234   gobject_class->set_property = gst_rtsp_thread_pool_set_property;
235   gobject_class->finalize = gst_rtsp_thread_pool_finalize;
236
237   /**
238    * GstRTSPThreadPool::max-threads:
239    *
240    * The maximum amount of threads to use for client connections. A value of
241    * 0 means to use only the mainloop, -1 means an unlimited amount of
242    * threads.
243    */
244   g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
245       g_param_spec_int ("max-threads", "Max Threads",
246           "The maximum amount of threads to use for client connections "
247           "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT,
248           DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249
250   klass->get_thread = default_get_thread;
251
252   GST_DEBUG_CATEGORY_INIT (rtsp_thread_pool_debug, "rtspthreadpool", 0,
253       "GstRTSPThreadPool");
254
255   thread_pool = g_quark_from_string ("gst.rtsp.thread.pool");
256 }
257
258 static void
259 gst_rtsp_thread_pool_init (GstRTSPThreadPool * pool)
260 {
261   GstRTSPThreadPoolPrivate *priv;
262
263   pool->priv = priv = gst_rtsp_thread_pool_get_instance_private (pool);
264
265   g_mutex_init (&priv->lock);
266   priv->max_threads = DEFAULT_MAX_THREADS;
267   g_queue_init (&priv->threads);
268 }
269
270 static void
271 gst_rtsp_thread_pool_finalize (GObject * obj)
272 {
273   GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (obj);
274   GstRTSPThreadPoolPrivate *priv = pool->priv;
275
276   GST_INFO ("finalize pool %p", pool);
277
278   g_queue_clear (&priv->threads);
279   g_mutex_clear (&priv->lock);
280
281   G_OBJECT_CLASS (gst_rtsp_thread_pool_parent_class)->finalize (obj);
282 }
283
284 static void
285 gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
286     GValue * value, GParamSpec * pspec)
287 {
288   GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);
289
290   switch (propid) {
291     case PROP_MAX_THREADS:
292       g_value_set_int (value, gst_rtsp_thread_pool_get_max_threads (pool));
293       break;
294     default:
295       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
296   }
297 }
298
299 static void
300 gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
301     const GValue * value, GParamSpec * pspec)
302 {
303   GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);
304
305   switch (propid) {
306     case PROP_MAX_THREADS:
307       gst_rtsp_thread_pool_set_max_threads (pool, g_value_get_int (value));
308       break;
309     default:
310       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
311   }
312 }
313
314 static gpointer
315 do_loop (GstRTSPThread * thread)
316 {
317   GstRTSPThreadPoolPrivate *priv;
318   GstRTSPThreadPoolClass *klass;
319   GstRTSPThreadPool *pool;
320
321   pool = gst_mini_object_get_qdata (GST_MINI_OBJECT (thread), thread_pool);
322   priv = pool->priv;
323
324   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
325
326   if (klass->thread_enter)
327     klass->thread_enter (pool, thread);
328
329   GST_INFO ("enter mainloop of thread %p", thread);
330   g_main_loop_run (thread->loop);
331   GST_INFO ("exit mainloop of thread %p", thread);
332
333   if (klass->thread_leave)
334     klass->thread_leave (pool, thread);
335
336   g_mutex_lock (&priv->lock);
337   g_queue_remove (&priv->threads, thread);
338   g_mutex_unlock (&priv->lock);
339
340   gst_rtsp_thread_unref (thread);
341
342   return NULL;
343 }
344
345 /**
346  * gst_rtsp_thread_pool_new:
347  *
348  * Create a new #GstRTSPThreadPool instance.
349  *
350  * Returns: (transfer full): a new #GstRTSPThreadPool
351  */
352 GstRTSPThreadPool *
353 gst_rtsp_thread_pool_new (void)
354 {
355   GstRTSPThreadPool *result;
356
357   result = g_object_new (GST_TYPE_RTSP_THREAD_POOL, NULL);
358
359   return result;
360 }
361
362 /**
363  * gst_rtsp_thread_pool_set_max_threads:
364  * @pool: a #GstRTSPThreadPool
365  * @max_threads: maximum threads
366  *
367  * Set the maximum threads used by the pool to handle client requests.
368  * A value of 0 will use the pool mainloop, a value of -1 will use an
369  * unlimited number of threads.
370  */
371 void
372 gst_rtsp_thread_pool_set_max_threads (GstRTSPThreadPool * pool,
373     gint max_threads)
374 {
375   GstRTSPThreadPoolPrivate *priv;
376
377   g_return_if_fail (GST_IS_RTSP_THREAD_POOL (pool));
378
379   priv = pool->priv;
380
381   g_mutex_lock (&priv->lock);
382   priv->max_threads = max_threads;
383   g_mutex_unlock (&priv->lock);
384 }
385
386 /**
387  * gst_rtsp_thread_pool_get_max_threads:
388  * @pool: a #GstRTSPThreadPool
389  *
390  * Get the maximum number of threads used for client connections.
391  * See gst_rtsp_thread_pool_set_max_threads().
392  *
393  * Returns: the maximum number of threads.
394  */
395 gint
396 gst_rtsp_thread_pool_get_max_threads (GstRTSPThreadPool * pool)
397 {
398   GstRTSPThreadPoolPrivate *priv;
399   gint res;
400
401   g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), -1);
402
403   priv = pool->priv;
404
405   g_mutex_lock (&priv->lock);
406   res = priv->max_threads;
407   g_mutex_unlock (&priv->lock);
408
409   return res;
410 }
411
412 static GstRTSPThread *
413 make_thread (GstRTSPThreadPool * pool, GstRTSPThreadType type,
414     GstRTSPContext * ctx)
415 {
416   GstRTSPThreadPoolClass *klass;
417   GstRTSPThread *thread;
418
419   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
420
421   thread = gst_rtsp_thread_new (type);
422   gst_mini_object_set_qdata (GST_MINI_OBJECT (thread), thread_pool,
423       g_object_ref (pool), g_object_unref);
424
425   GST_DEBUG_OBJECT (pool, "new thread %p", thread);
426
427   if (klass->configure_thread)
428     klass->configure_thread (pool, thread, ctx);
429
430   return thread;
431 }
432
433 static GstRTSPThread *
434 default_get_thread (GstRTSPThreadPool * pool,
435     GstRTSPThreadType type, GstRTSPContext * ctx)
436 {
437   GstRTSPThreadPoolPrivate *priv = pool->priv;
438   GstRTSPThreadPoolClass *klass;
439   GstRTSPThread *thread;
440   GError *error = NULL;
441
442   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
443
444   switch (type) {
445     case GST_RTSP_THREAD_TYPE_CLIENT:
446       if (priv->max_threads == 0) {
447         /* no threads allowed */
448         GST_DEBUG_OBJECT (pool, "no client threads allowed");
449         thread = NULL;
450       } else {
451         g_mutex_lock (&priv->lock);
452       retry:
453         if (priv->max_threads > 0 &&
454             g_queue_get_length (&priv->threads) >= priv->max_threads) {
455           /* max threads reached, recycle from queue */
456           thread = g_queue_pop_head (&priv->threads);
457           GST_DEBUG_OBJECT (pool, "recycle client thread %p", thread);
458           if (!gst_rtsp_thread_reuse (thread)) {
459             GST_DEBUG_OBJECT (pool, "thread %p stopping, retry", thread);
460             /* this can happen if we just decremented the reuse counter of the
461              * thread and signaled the mainloop that it should stop. We leave
462              * the thread out of the queue now, there is no point to add it
463              * again, it will be removed from the mainloop otherwise after it
464              * stops. */
465             goto retry;
466           }
467         } else {
468           /* make more threads */
469           GST_DEBUG_OBJECT (pool, "make new client thread");
470           thread = make_thread (pool, type, ctx);
471
472           if (!g_thread_pool_push (klass->pool, gst_rtsp_thread_ref (thread),
473                   &error))
474             goto thread_error;
475         }
476         g_queue_push_tail (&priv->threads, thread);
477         g_mutex_unlock (&priv->lock);
478       }
479       break;
480     case GST_RTSP_THREAD_TYPE_MEDIA:
481       GST_DEBUG_OBJECT (pool, "make new media thread");
482       thread = make_thread (pool, type, ctx);
483
484       if (!g_thread_pool_push (klass->pool, gst_rtsp_thread_ref (thread),
485               &error))
486         goto thread_error;
487       break;
488     default:
489       thread = NULL;
490       break;
491   }
492   return thread;
493
494   /* ERRORS */
495 thread_error:
496   {
497     GST_ERROR_OBJECT (pool, "failed to push thread %s", error->message);
498     gst_rtsp_thread_unref (thread);
499     /* drop also the ref dedicated for the pool */
500     gst_rtsp_thread_unref (thread);
501     g_clear_error (&error);
502     return NULL;
503   }
504 }
505
506 /**
507  * gst_rtsp_thread_pool_get_thread:
508  * @pool: a #GstRTSPThreadPool
509  * @type: the #GstRTSPThreadType
510  * @ctx: (transfer none): a #GstRTSPContext
511  *
512  * Get a new #GstRTSPThread for @type and @ctx.
513  *
514  * Returns: (transfer full) (nullable): a new #GstRTSPThread,
515  * gst_rtsp_thread_stop() after usage
516  */
517 GstRTSPThread *
518 gst_rtsp_thread_pool_get_thread (GstRTSPThreadPool * pool,
519     GstRTSPThreadType type, GstRTSPContext * ctx)
520 {
521   GstRTSPThreadPoolClass *klass;
522   GstRTSPThread *result = NULL;
523
524   g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), NULL);
525
526   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
527
528   /* We want to be thread safe as there might be 2 threads wanting to get new
529    * #GstRTSPThread at the same time
530    */
531   if (G_UNLIKELY (!g_atomic_pointer_get (&klass->pool))) {
532     GThreadPool *t_pool;
533     t_pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL);
534     if (!g_atomic_pointer_compare_and_exchange (&klass->pool,
535             (GThreadPool *) NULL, t_pool))
536       g_thread_pool_free (t_pool, FALSE, TRUE);
537   }
538
539   if (klass->get_thread)
540     result = klass->get_thread (pool, type, ctx);
541
542   return result;
543 }
544
545 /**
546  * gst_rtsp_thread_pool_cleanup:
547  *
548  * Wait for all tasks to be stopped and free all allocated resources. This is
549  * mainly used in test suites to ensure proper cleanup of internal data
550  * structures.
551  */
552 void
553 gst_rtsp_thread_pool_cleanup (void)
554 {
555   GstRTSPThreadPoolClass *klass;
556
557   klass =
558       GST_RTSP_THREAD_POOL_CLASS (g_type_class_ref
559       (gst_rtsp_thread_pool_get_type ()));
560   if (klass->pool != NULL) {
561     g_thread_pool_free (klass->pool, FALSE, TRUE);
562     klass->pool = NULL;
563   }
564   g_type_class_unref (klass);
565 }