docs: update docs
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-thread-pool.c
1 /* GStreamer
2  * Copyright (C) 2013 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21
22 #include "rtsp-thread-pool.h"
23
24 typedef struct _GstRTSPThreadImpl
25 {
26   GstRTSPThread thread;
27
28   gint reused;
29 } GstRTSPThreadImpl;
30
31 GST_DEFINE_MINI_OBJECT_TYPE (GstRTSPThread, gst_rtsp_thread);
32
33 static void gst_rtsp_thread_init (GstRTSPThreadImpl * impl);
34
35 static void
36 _gst_rtsp_thread_free (GstRTSPThreadImpl * impl)
37 {
38   GST_DEBUG ("free thread %p", impl);
39
40   g_main_loop_unref (impl->thread.loop);
41   g_main_context_unref (impl->thread.context);
42   g_slice_free1 (sizeof (GstRTSPThreadImpl), impl);
43 }
44
45 static GstRTSPThread *
46 _gst_rtsp_thread_copy (GstRTSPThreadImpl * impl)
47 {
48   GstRTSPThreadImpl *copy;
49
50   GST_DEBUG ("copy thread %p", impl);
51
52   copy = g_slice_new0 (GstRTSPThreadImpl);
53   gst_rtsp_thread_init (copy);
54   copy->thread.context = g_main_context_ref (impl->thread.context);
55   copy->thread.loop = g_main_loop_ref (impl->thread.loop);
56
57   return GST_RTSP_THREAD (copy);
58 }
59
60 static void
61 gst_rtsp_thread_init (GstRTSPThreadImpl * impl)
62 {
63   gst_mini_object_init (GST_MINI_OBJECT_CAST (impl), 0,
64       GST_TYPE_RTSP_THREAD,
65       (GstMiniObjectCopyFunction) _gst_rtsp_thread_copy, NULL,
66       (GstMiniObjectFreeFunction) _gst_rtsp_thread_free);
67
68   g_atomic_int_set (&impl->reused, 1);
69 }
70
71 /**
72  * gst_rtsp_thread_new:
73  * @type: the thread type
74  *
75  * Create a new thread object that can run a mainloop.
76  *
77  * Returns: a #GstRTSPThread.
78  */
79 GstRTSPThread *
80 gst_rtsp_thread_new (GstRTSPThreadType type)
81 {
82   GstRTSPThreadImpl *impl;
83
84   impl = g_slice_new0 (GstRTSPThreadImpl);
85
86   gst_rtsp_thread_init (impl);
87   impl->thread.type = type;
88   impl->thread.context = g_main_context_new ();
89   impl->thread.loop = g_main_loop_new (impl->thread.context, TRUE);
90
91   return GST_RTSP_THREAD (impl);
92 }
93
94 /**
95  * gst_rtsp_thread_reuse:
96  * @thread: a #GstRTSPThread
97  *
98  * Reuse the mainloop of @thread
99  */
100 void
101 gst_rtsp_thread_reuse (GstRTSPThread * thread)
102 {
103   GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
104
105   g_return_if_fail (GST_IS_RTSP_THREAD (thread));
106
107   GST_DEBUG ("reuse thread %p", thread);
108   g_atomic_int_inc (&impl->reused);
109 }
110
111 /**
112  * gst_rtsp_thread_stop:
113  * @thread: a #GstRTSPThread
114  *
115  * Stop @thread. When no threads are using the mainloop, the thread will be
116  * stopped and the final ref to @thread will be released.
117  */
118 void
119 gst_rtsp_thread_stop (GstRTSPThread * thread)
120 {
121   GstRTSPThreadImpl *impl = (GstRTSPThreadImpl *) thread;
122
123   g_return_if_fail (GST_IS_RTSP_THREAD (thread));
124
125   GST_DEBUG ("stop thread %p", thread);
126
127   if (g_atomic_int_dec_and_test (&impl->reused)) {
128     GST_DEBUG ("stop mainloop of thread %p", thread);
129     g_main_loop_quit (thread->loop);
130   }
131 }
132
133 #define GST_RTSP_THREAD_POOL_GET_PRIVATE(obj)  \
134    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_THREAD_POOL, GstRTSPThreadPoolPrivate))
135
136 struct _GstRTSPThreadPoolPrivate
137 {
138   GMutex lock;
139
140   gint max_threads;
141   /* currently used mainloops */
142   GQueue threads;
143 };
144
145 #define DEFAULT_MAX_THREADS 1
146
147 enum
148 {
149   PROP_0,
150   PROP_MAX_THREADS,
151   PROP_LAST
152 };
153
154 GST_DEBUG_CATEGORY_STATIC (rtsp_thread_pool_debug);
155 #define GST_CAT_DEFAULT rtsp_thread_pool_debug
156
157 static GQuark thread_pool;
158
159 static void gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
160     GValue * value, GParamSpec * pspec);
161 static void gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
162     const GValue * value, GParamSpec * pspec);
163 static void gst_rtsp_thread_pool_finalize (GObject * obj);
164
165 static gpointer do_loop (GstRTSPThread * thread);
166 static GstRTSPThread *default_get_thread (GstRTSPThreadPool * pool,
167     GstRTSPThreadType type, GstRTSPClientState * state);
168
169 G_DEFINE_TYPE (GstRTSPThreadPool, gst_rtsp_thread_pool, G_TYPE_OBJECT);
170
171 static void
172 gst_rtsp_thread_pool_class_init (GstRTSPThreadPoolClass * klass)
173 {
174   GObjectClass *gobject_class;
175
176   g_type_class_add_private (klass, sizeof (GstRTSPThreadPoolPrivate));
177
178   gobject_class = G_OBJECT_CLASS (klass);
179
180   gobject_class->get_property = gst_rtsp_thread_pool_get_property;
181   gobject_class->set_property = gst_rtsp_thread_pool_set_property;
182   gobject_class->finalize = gst_rtsp_thread_pool_finalize;
183
184   /**
185    * GstRTSPThreadPool::max-threads:
186    *
187    * The maximum amount of threads to use for client connections. A value of
188    * 0 means to use only the mainloop, -1 means an unlimited amount of
189    * threads.
190    */
191   g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
192       g_param_spec_int ("max-threads", "Max Threads",
193           "The maximum amount of threads to use for client connections "
194           "(0 = only mainloop, -1 = unlimited)", -1, G_MAXINT,
195           DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   klass->get_thread = default_get_thread;
198
199   klass->pool = g_thread_pool_new ((GFunc) do_loop, klass, -1, FALSE, NULL);
200
201   GST_DEBUG_CATEGORY_INIT (rtsp_thread_pool_debug, "rtspthreadpool", 0,
202       "GstRTSPThreadPool");
203
204   thread_pool = g_quark_from_string ("gst.rtsp.thread.pool");
205 }
206
207 static void
208 gst_rtsp_thread_pool_init (GstRTSPThreadPool * pool)
209 {
210   GstRTSPThreadPoolPrivate *priv;
211
212   pool->priv = priv = GST_RTSP_THREAD_POOL_GET_PRIVATE (pool);
213
214   g_mutex_init (&priv->lock);
215   priv->max_threads = DEFAULT_MAX_THREADS;
216   g_queue_init (&priv->threads);
217 }
218
219 static void
220 gst_rtsp_thread_pool_finalize (GObject * obj)
221 {
222   GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (obj);
223   GstRTSPThreadPoolPrivate *priv = pool->priv;
224
225   GST_INFO ("finalize pool %p", pool);
226
227   g_queue_clear (&priv->threads);
228   g_mutex_clear (&priv->lock);
229
230   G_OBJECT_CLASS (gst_rtsp_thread_pool_parent_class)->finalize (obj);
231 }
232
233 static void
234 gst_rtsp_thread_pool_get_property (GObject * object, guint propid,
235     GValue * value, GParamSpec * pspec)
236 {
237   GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);
238
239   switch (propid) {
240     case PROP_MAX_THREADS:
241       g_value_set_int (value, gst_rtsp_thread_pool_get_max_threads (pool));
242       break;
243     default:
244       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
245   }
246 }
247
248 static void
249 gst_rtsp_thread_pool_set_property (GObject * object, guint propid,
250     const GValue * value, GParamSpec * pspec)
251 {
252   GstRTSPThreadPool *pool = GST_RTSP_THREAD_POOL (object);
253
254   switch (propid) {
255     case PROP_MAX_THREADS:
256       gst_rtsp_thread_pool_set_max_threads (pool, g_value_get_int (value));
257       break;
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
260   }
261 }
262
263 static gpointer
264 do_loop (GstRTSPThread * thread)
265 {
266   GstRTSPThreadPoolPrivate *priv;
267   GstRTSPThreadPoolClass *klass;
268   GstRTSPThreadPool *pool;
269
270   pool = gst_mini_object_get_qdata (GST_MINI_OBJECT (thread), thread_pool);
271   priv = pool->priv;
272
273   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
274
275   if (klass->thread_enter)
276     klass->thread_enter (pool, thread);
277
278   GST_INFO ("enter mainloop of thread %p", thread);
279   g_main_loop_run (thread->loop);
280   GST_INFO ("exit mainloop of thread %p", thread);
281
282   if (klass->thread_leave)
283     klass->thread_leave (pool, thread);
284
285   g_mutex_lock (&priv->lock);
286   g_queue_remove (&priv->threads, thread);
287   g_mutex_unlock (&priv->lock);
288
289   gst_rtsp_thread_unref (thread);
290
291   return NULL;
292 }
293
294 /**
295  * gst_rtsp_thread_pool_new:
296  *
297  * Create a new #GstRTSPThreadPool instance.
298  *
299  * Returns: a new #GstRTSPThreadPool
300  */
301 GstRTSPThreadPool *
302 gst_rtsp_thread_pool_new (void)
303 {
304   GstRTSPThreadPool *result;
305
306   result = g_object_new (GST_TYPE_RTSP_THREAD_POOL, NULL);
307
308   return result;
309 }
310
311 /**
312  * gst_rtsp_thread_pool_set_max_threads:
313  * @pool: a #GstRTSPThreadPool
314  * @max_threads: maximum threads
315  *
316  * Set the maximum threads used by the pool to handle client requests.
317  * A value of 0 will use the pool mainloop, a value of -1 will use an
318  * unlimited number of threads.
319  */
320 void
321 gst_rtsp_thread_pool_set_max_threads (GstRTSPThreadPool * pool,
322     gint max_threads)
323 {
324   GstRTSPThreadPoolPrivate *priv;
325
326   g_return_if_fail (GST_IS_RTSP_THREAD_POOL (pool));
327
328   priv = pool->priv;
329
330   g_mutex_lock (&priv->lock);
331   priv->max_threads = max_threads;
332   g_mutex_unlock (&priv->lock);
333 }
334
335 /**
336  * gst_rtsp_thread_pool_get_max_threads:
337  * @pool: a #GstRTSPThreadPool
338  *
339  * Get the maximum number of threads used for client connections.
340  * See gst_rtsp_thread_pool_set_max_threads().
341  *
342  * Returns: the maximum number of threads.
343  */
344 gint
345 gst_rtsp_thread_pool_get_max_threads (GstRTSPThreadPool * pool)
346 {
347   GstRTSPThreadPoolPrivate *priv;
348   gint res;
349
350   g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), -1);
351
352   priv = pool->priv;
353
354   g_mutex_lock (&priv->lock);
355   res = priv->max_threads;
356   g_mutex_unlock (&priv->lock);
357
358   return res;
359 }
360
361 static GstRTSPThread *
362 make_thread (GstRTSPThreadPool * pool, GstRTSPThreadType type,
363     GstRTSPClientState * state)
364 {
365   GstRTSPThreadPoolClass *klass;
366   GstRTSPThread *thread;
367
368   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
369
370   thread = gst_rtsp_thread_new (type);
371   gst_mini_object_set_qdata (GST_MINI_OBJECT (thread), thread_pool,
372       g_object_ref (pool), g_object_unref);
373
374   GST_DEBUG_OBJECT (pool, "new thread %p", thread);
375
376   if (klass->configure_thread)
377     klass->configure_thread (pool, thread, state);
378
379   return thread;
380 }
381
382 static GstRTSPThread *
383 default_get_thread (GstRTSPThreadPool * pool,
384     GstRTSPThreadType type, GstRTSPClientState * state)
385 {
386   GstRTSPThreadPoolPrivate *priv = pool->priv;
387   GstRTSPThreadPoolClass *klass;
388   GstRTSPThread *thread;
389   GError *error = NULL;
390
391   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
392
393   switch (type) {
394     case GST_RTSP_THREAD_TYPE_CLIENT:
395       if (priv->max_threads == 0) {
396         /* no threads allowed */
397         GST_DEBUG_OBJECT (pool, "no client threads allowed");
398         thread = NULL;
399       } else {
400         if (priv->max_threads > 0 &&
401             g_queue_get_length (&priv->threads) >= priv->max_threads) {
402           /* max threads reached, recycle from queue */
403           GST_DEBUG_OBJECT (pool, "recycle client thread");
404           thread = g_queue_pop_head (&priv->threads);
405           gst_rtsp_thread_ref (thread);
406         } else {
407           /* make more threads */
408           GST_DEBUG_OBJECT (pool, "make new client thread");
409           thread = make_thread (pool, type, state);
410
411           if (!g_thread_pool_push (klass->pool, thread, &error))
412             goto thread_error;
413         }
414         g_queue_push_tail (&priv->threads, thread);
415       }
416       break;
417     case GST_RTSP_THREAD_TYPE_MEDIA:
418       GST_DEBUG_OBJECT (pool, "make new media thread");
419       thread = make_thread (pool, type, state);
420
421       if (!g_thread_pool_push (klass->pool, thread, &error))
422         goto thread_error;
423       break;
424     default:
425       thread = NULL;
426       break;
427   }
428   return thread;
429
430   /* ERRORS */
431 thread_error:
432   {
433     GST_ERROR_OBJECT (pool, "failed to push thread %s", error->message);
434     gst_rtsp_thread_unref (thread);
435     g_clear_error (&error);
436     return NULL;
437   }
438 }
439
440 /**
441  * gst_rtsp_thread_pool_get_thread:
442  * @pool: a #GstRTSPThreadPool
443  * @type: the #GstRTSPThreadType
444  * @state: a #GstRTSPClientState
445  *
446  * Get a new #GstRTSPThread for @type and @state.
447  *
448  * Returns: a new #GstRTSPThread, gst_rtsp_thread_stop() after usage
449  */
450 GstRTSPThread *
451 gst_rtsp_thread_pool_get_thread (GstRTSPThreadPool * pool,
452     GstRTSPThreadType type, GstRTSPClientState * state)
453 {
454   GstRTSPThreadPoolClass *klass;
455   GstRTSPThread *result = NULL;
456
457   g_return_val_if_fail (GST_IS_RTSP_THREAD_POOL (pool), NULL);
458   g_return_val_if_fail (state != NULL, NULL);
459
460   klass = GST_RTSP_THREAD_POOL_GET_CLASS (pool);
461
462   if (klass->get_thread)
463     result = klass->get_thread (pool, type, state);
464
465   return result;
466 }