paramspec: Move condition check inside the g_return
[platform/upstream/gstreamer.git] / gst / gsttaskpool.c
1 /* GStreamer
2  * Copyright (C) 2009 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gsttaskpool.c: Pool for streaming threads
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gsttaskpool
24  * @title: GstTaskPool
25  * @short_description: Pool of GStreamer streaming threads
26  * @see_also: #GstTask, #GstPad
27  *
28  * This object provides an abstraction for creating threads. The default
29  * implementation uses a regular GThreadPool to start tasks.
30  *
31  * Subclasses can be made to create custom threads.
32  */
33
34 #include "gst_private.h"
35
36 #include "gstinfo.h"
37 #include "gsttaskpool.h"
38
39 GST_DEBUG_CATEGORY_STATIC (taskpool_debug);
40 #define GST_CAT_DEFAULT (taskpool_debug)
41
42 #ifndef GST_DISABLE_GST_DEBUG
43 static void gst_task_pool_finalize (GObject * object);
44 #endif
45
46 #define _do_init \
47 { \
48   GST_DEBUG_CATEGORY_INIT (taskpool_debug, "taskpool", 0, "Thread pool"); \
49 }
50
51 G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init);
52
53 typedef struct
54 {
55   GstTaskPoolFunction func;
56   gpointer user_data;
57 } TaskData;
58
59 static void
60 default_func (TaskData * tdata, GstTaskPool * pool)
61 {
62   GstTaskPoolFunction func;
63   gpointer user_data;
64
65   func = tdata->func;
66   user_data = tdata->user_data;
67   g_slice_free (TaskData, tdata);
68
69   func (user_data);
70 }
71
72 static void
73 default_prepare (GstTaskPool * pool, GError ** error)
74 {
75   GST_OBJECT_LOCK (pool);
76   pool->pool = g_thread_pool_new ((GFunc) default_func, pool, -1, FALSE, NULL);
77   GST_OBJECT_UNLOCK (pool);
78 }
79
80 static void
81 default_cleanup (GstTaskPool * pool)
82 {
83   GST_OBJECT_LOCK (pool);
84   if (pool->pool) {
85     /* Shut down all the threads, we still process the ones scheduled
86      * because the unref happens in the thread function.
87      * Also wait for currently running ones to finish. */
88     g_thread_pool_free (pool->pool, FALSE, TRUE);
89     pool->pool = NULL;
90   }
91   GST_OBJECT_UNLOCK (pool);
92 }
93
94 static gpointer
95 default_push (GstTaskPool * pool, GstTaskPoolFunction func,
96     gpointer user_data, GError ** error)
97 {
98   TaskData *tdata;
99
100   tdata = g_slice_new (TaskData);
101   tdata->func = func;
102   tdata->user_data = user_data;
103
104   GST_OBJECT_LOCK (pool);
105   if (pool->pool)
106     g_thread_pool_push (pool->pool, tdata, error);
107   else {
108     g_slice_free (TaskData, tdata);
109   }
110   GST_OBJECT_UNLOCK (pool);
111
112   return NULL;
113 }
114
115 static void
116 default_join (GstTaskPool * pool, gpointer id)
117 {
118   /* we do nothing here, we can't join from the pools */
119 }
120
121 static void
122 gst_task_pool_class_init (GstTaskPoolClass * klass)
123 {
124   GObjectClass *gobject_class;
125   GstTaskPoolClass *gsttaskpool_class;
126
127   gobject_class = (GObjectClass *) klass;
128   gsttaskpool_class = (GstTaskPoolClass *) klass;
129
130 #ifndef GST_DISABLE_GST_DEBUG
131   gobject_class->finalize = gst_task_pool_finalize;
132 #endif
133
134   gsttaskpool_class->prepare = default_prepare;
135   gsttaskpool_class->cleanup = default_cleanup;
136   gsttaskpool_class->push = default_push;
137   gsttaskpool_class->join = default_join;
138 }
139
140 static void
141 gst_task_pool_init (GstTaskPool * pool)
142 {
143 }
144
145 #ifndef GST_DISABLE_GST_DEBUG
146 static void
147 gst_task_pool_finalize (GObject * object)
148 {
149   GST_DEBUG ("taskpool %p finalize", object);
150
151   G_OBJECT_CLASS (gst_task_pool_parent_class)->finalize (object);
152 }
153 #endif
154 /**
155  * gst_task_pool_new:
156  *
157  * Create a new default task pool. The default task pool will use a regular
158  * GThreadPool for threads.
159  *
160  * Returns: (transfer full): a new #GstTaskPool. gst_object_unref() after usage.
161  */
162 GstTaskPool *
163 gst_task_pool_new (void)
164 {
165   GstTaskPool *pool;
166
167   pool = g_object_new (GST_TYPE_TASK_POOL, NULL);
168
169   /* clear floating flag */
170   gst_object_ref_sink (pool);
171
172   return pool;
173 }
174
175 /**
176  * gst_task_pool_prepare:
177  * @pool: a #GstTaskPool
178  * @error: an error return location
179  *
180  * Prepare the taskpool for accepting gst_task_pool_push() operations.
181  *
182  * MT safe.
183  */
184 void
185 gst_task_pool_prepare (GstTaskPool * pool, GError ** error)
186 {
187   GstTaskPoolClass *klass;
188
189   g_return_if_fail (GST_IS_TASK_POOL (pool));
190
191   klass = GST_TASK_POOL_GET_CLASS (pool);
192
193   if (klass->prepare)
194     klass->prepare (pool, error);
195 }
196
197 /**
198  * gst_task_pool_cleanup:
199  * @pool: a #GstTaskPool
200  *
201  * Wait for all tasks to be stopped. This is mainly used internally
202  * to ensure proper cleanup of internal data structures in test suites.
203  *
204  * MT safe.
205  */
206 void
207 gst_task_pool_cleanup (GstTaskPool * pool)
208 {
209   GstTaskPoolClass *klass;
210
211   g_return_if_fail (GST_IS_TASK_POOL (pool));
212
213   klass = GST_TASK_POOL_GET_CLASS (pool);
214
215   if (klass->cleanup)
216     klass->cleanup (pool);
217 }
218
219 /**
220  * gst_task_pool_push:
221  * @pool: a #GstTaskPool
222  * @func: (scope async): the function to call
223  * @user_data: (closure): data to pass to @func
224  * @error: return location for an error
225  *
226  * Start the execution of a new thread from @pool.
227  *
228  * Returns: (transfer none) (nullable): a pointer that should be used
229  * for the gst_task_pool_join function. This pointer can be %NULL, you
230  * must check @error to detect errors.
231  */
232 gpointer
233 gst_task_pool_push (GstTaskPool * pool, GstTaskPoolFunction func,
234     gpointer user_data, GError ** error)
235 {
236   GstTaskPoolClass *klass;
237
238   g_return_val_if_fail (GST_IS_TASK_POOL (pool), NULL);
239
240   klass = GST_TASK_POOL_GET_CLASS (pool);
241
242   if (klass->push == NULL)
243     goto not_supported;
244
245   return klass->push (pool, func, user_data, error);
246
247   /* ERRORS */
248 not_supported:
249   {
250     g_warning ("pushing tasks on pool %p is not supported", pool);
251     return NULL;
252   }
253 }
254
255 /**
256  * gst_task_pool_join:
257  * @pool: a #GstTaskPool
258  * @id: the id
259  *
260  * Join a task and/or return it to the pool. @id is the id obtained from
261  * gst_task_pool_push().
262  */
263 void
264 gst_task_pool_join (GstTaskPool * pool, gpointer id)
265 {
266   GstTaskPoolClass *klass;
267
268   g_return_if_fail (GST_IS_TASK_POOL (pool));
269
270   klass = GST_TASK_POOL_GET_CLASS (pool);
271
272   if (klass->join)
273     klass->join (pool, id);
274 }