Don't use deprecated g_object_newv()
[platform/upstream/gstreamer.git] / gst / gsttaskpool.c
1 /* GStreamer
2  * Copyright (C) 2009 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gsttaskpool.c: Pool for streaming threads
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gsttaskpool
24  * @title: GstTaskPool
25  * @short_description: Pool of GStreamer streaming threads
26  * @see_also: #GstTask, #GstPad
27  *
28  * This object provides an abstraction for creating threads. The default
29  * implementation uses a regular GThreadPool to start tasks.
30  *
31  * Subclasses can be made to create custom threads.
32  */
33
34 #include "gst_private.h"
35
36 #include "gstinfo.h"
37 #include "gsttaskpool.h"
38
39 GST_DEBUG_CATEGORY_STATIC (taskpool_debug);
40 #define GST_CAT_DEFAULT (taskpool_debug)
41
42 #ifndef GST_DISABLE_GST_DEBUG
43 static void gst_task_pool_finalize (GObject * object);
44 #endif
45
46 #define _do_init \
47 { \
48   GST_DEBUG_CATEGORY_INIT (taskpool_debug, "taskpool", 0, "Thread pool"); \
49 }
50
51 G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init);
52
53 typedef struct
54 {
55   GstTaskPoolFunction func;
56   gpointer user_data;
57 } TaskData;
58
59 static void
60 default_func (TaskData * tdata, GstTaskPool * pool)
61 {
62   GstTaskPoolFunction func;
63   gpointer user_data;
64
65   func = tdata->func;
66   user_data = tdata->user_data;
67   g_slice_free (TaskData, tdata);
68
69   func (user_data);
70 }
71
72 static void
73 default_prepare (GstTaskPool * pool, GError ** error)
74 {
75   GST_OBJECT_LOCK (pool);
76   pool->pool = g_thread_pool_new ((GFunc) default_func, pool, -1, FALSE, NULL);
77   GST_OBJECT_UNLOCK (pool);
78 }
79
80 static void
81 default_cleanup (GstTaskPool * pool)
82 {
83   GST_OBJECT_LOCK (pool);
84   if (pool->pool) {
85     /* Shut down all the threads, we still process the ones scheduled
86      * because the unref happens in the thread function.
87      * Also wait for currently running ones to finish. */
88     g_thread_pool_free (pool->pool, FALSE, TRUE);
89     pool->pool = NULL;
90   }
91   GST_OBJECT_UNLOCK (pool);
92 }
93
94 static gpointer
95 default_push (GstTaskPool * pool, GstTaskPoolFunction func,
96     gpointer user_data, GError ** error)
97 {
98   TaskData *tdata;
99
100   tdata = g_slice_new (TaskData);
101   tdata->func = func;
102   tdata->user_data = user_data;
103
104   GST_OBJECT_LOCK (pool);
105   if (pool->pool)
106     g_thread_pool_push (pool->pool, tdata, error);
107   else {
108     g_slice_free (TaskData, tdata);
109   }
110   GST_OBJECT_UNLOCK (pool);
111
112   return NULL;
113 }
114
115 static void
116 default_join (GstTaskPool * pool, gpointer id)
117 {
118   /* we do nothing here, we can't join from the pools */
119 }
120
121 static void
122 gst_task_pool_class_init (GstTaskPoolClass * klass)
123 {
124   GObjectClass *gobject_class;
125   GstTaskPoolClass *gsttaskpool_class;
126
127   gobject_class = (GObjectClass *) klass;
128   gsttaskpool_class = (GstTaskPoolClass *) klass;
129
130 #ifndef GST_DISABLE_GST_DEBUG
131   gobject_class->finalize = gst_task_pool_finalize;
132 #endif
133
134   gsttaskpool_class->prepare = default_prepare;
135   gsttaskpool_class->cleanup = default_cleanup;
136   gsttaskpool_class->push = default_push;
137   gsttaskpool_class->join = default_join;
138 }
139
140 static void
141 gst_task_pool_init (GstTaskPool * pool)
142 {
143   /* clear floating flag */
144   gst_object_ref_sink (pool);
145 }
146
147 #ifndef GST_DISABLE_GST_DEBUG
148 static void
149 gst_task_pool_finalize (GObject * object)
150 {
151   GST_DEBUG ("taskpool %p finalize", object);
152
153   G_OBJECT_CLASS (gst_task_pool_parent_class)->finalize (object);
154 }
155 #endif
156 /**
157  * gst_task_pool_new:
158  *
159  * Create a new default task pool. The default task pool will use a regular
160  * GThreadPool for threads.
161  *
162  * Returns: (transfer full): a new #GstTaskPool. gst_object_unref() after usage.
163  */
164 GstTaskPool *
165 gst_task_pool_new (void)
166 {
167   GstTaskPool *pool;
168
169   pool = g_object_new (GST_TYPE_TASK_POOL, NULL);
170
171   return pool;
172 }
173
174 /**
175  * gst_task_pool_prepare:
176  * @pool: a #GstTaskPool
177  * @error: an error return location
178  *
179  * Prepare the taskpool for accepting gst_task_pool_push() operations.
180  *
181  * MT safe.
182  */
183 void
184 gst_task_pool_prepare (GstTaskPool * pool, GError ** error)
185 {
186   GstTaskPoolClass *klass;
187
188   g_return_if_fail (GST_IS_TASK_POOL (pool));
189
190   klass = GST_TASK_POOL_GET_CLASS (pool);
191
192   if (klass->prepare)
193     klass->prepare (pool, error);
194 }
195
196 /**
197  * gst_task_pool_cleanup:
198  * @pool: a #GstTaskPool
199  *
200  * Wait for all tasks to be stopped. This is mainly used internally
201  * to ensure proper cleanup of internal data structures in test suites.
202  *
203  * MT safe.
204  */
205 void
206 gst_task_pool_cleanup (GstTaskPool * pool)
207 {
208   GstTaskPoolClass *klass;
209
210   g_return_if_fail (GST_IS_TASK_POOL (pool));
211
212   klass = GST_TASK_POOL_GET_CLASS (pool);
213
214   if (klass->cleanup)
215     klass->cleanup (pool);
216 }
217
218 /**
219  * gst_task_pool_push:
220  * @pool: a #GstTaskPool
221  * @func: (scope async): the function to call
222  * @user_data: (closure): data to pass to @func
223  * @error: return location for an error
224  *
225  * Start the execution of a new thread from @pool.
226  *
227  * Returns: (transfer none) (nullable): a pointer that should be used
228  * for the gst_task_pool_join function. This pointer can be %NULL, you
229  * must check @error to detect errors.
230  */
231 gpointer
232 gst_task_pool_push (GstTaskPool * pool, GstTaskPoolFunction func,
233     gpointer user_data, GError ** error)
234 {
235   GstTaskPoolClass *klass;
236
237   g_return_val_if_fail (GST_IS_TASK_POOL (pool), NULL);
238
239   klass = GST_TASK_POOL_GET_CLASS (pool);
240
241   if (klass->push == NULL)
242     goto not_supported;
243
244   return klass->push (pool, func, user_data, error);
245
246   /* ERRORS */
247 not_supported:
248   {
249     g_warning ("pushing tasks on pool %p is not supported", pool);
250     return NULL;
251   }
252 }
253
254 /**
255  * gst_task_pool_join:
256  * @pool: a #GstTaskPool
257  * @id: the id
258  *
259  * Join a task and/or return it to the pool. @id is the id obtained from
260  * gst_task_pool_push().
261  */
262 void
263 gst_task_pool_join (GstTaskPool * pool, gpointer id)
264 {
265   GstTaskPoolClass *klass;
266
267   g_return_if_fail (GST_IS_TASK_POOL (pool));
268
269   klass = GST_TASK_POOL_GET_CLASS (pool);
270
271   if (klass->join)
272     klass->join (pool, id);
273 }