*.c: Don't cast to GST_OBJECT when reffing or unreffing. Large source-munging commit!!!
[platform/upstream/gstreamer.git] / gst / schedulers / threadscheduler.c
1 /* GStreamer2
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  *
4  * threadscheduler.c: scheduler using threads 
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #  include "config.h"
24 #endif
25
26 #include <gst/gst.h>
27 #include "../gst-i18n-lib.h"
28
29 GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
30 #define GST_CAT_DEFAULT debug_scheduler
31
32 #define GST_TYPE_THREAD_SCHEDULER \
33   (gst_thread_scheduler_get_type ())
34 #define GST_THREAD_SCHEDULER(obj) \
35   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_THREAD_SCHEDULER,GstThreadScheduler))
36 #define GST_THREAD_SCHEDULER_CLASS(klass) \
37   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_THREAD_SCHEDULER,GstThreadSchedulerClass))
38 #define GST_IS_THREAD_SCHEDULER(obj) \
39   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_THREAD_SCHEDULER))
40 #define GST_IS_THREAD_SCHEDULER_CLASS(obj) \
41   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_THREAD_SCHEDULER))
42
43 #define SCHED(element) (GST_THREAD_SCHEDULER ((element)->sched))
44
45 GType gst_thread_scheduler_get_type (void);
46
47 typedef struct _GstThreadScheduler GstThreadScheduler;
48 typedef struct _GstThreadSchedulerClass GstThreadSchedulerClass;
49
50 struct _GstThreadScheduler
51 {
52   GstScheduler scheduler;
53
54   GThreadPool *pool;
55 };
56
57 struct _GstThreadSchedulerClass
58 {
59   GstSchedulerClass scheduler_class;
60 };
61
62 #define ELEMENT_PRIVATE(element) GST_ELEMENT (element)->sched_private
63 #define PAD_PRIVATE(pad) (GST_REAL_PAD (pad))->sched_private
64
65 #define GST_TYPE_THREAD_SCHEDULER_TASK \
66   (gst_thread_scheduler_task_get_type ())
67 #define GST_THREAD_SCHEDULER_TASK(obj) \
68   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_THREAD_SCHEDULER_TASK,GstThreadSchedulerTask))
69 #define GST_THREAD_SCHEDULER_TASK_CLASS(klass) \
70   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_THREAD_SCHEDULER_TASK,GstThreadSchedulerTaskClass))
71 #define GST_IS_THREAD_SCHEDULER_TASK(obj) \
72   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_THREAD_SCHEDULER_TASK))
73 #define GST_IS_THREAD_SCHEDULER_TASK_CLASS(obj) \
74   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_THREAD_SCHEDULER_TASK))
75
76 typedef struct _GstThreadSchedulerTask GstThreadSchedulerTask;
77 typedef struct _GstThreadSchedulerTaskClass GstThreadSchedulerTaskClass;
78
79 struct _GstThreadSchedulerTask
80 {
81   GstTask task;
82 };
83
84 struct _GstThreadSchedulerTaskClass
85 {
86   GstTaskClass parent_class;
87 };
88
89 static void gst_thread_scheduler_task_class_init (gpointer g_class,
90     gpointer data);
91 static void gst_thread_scheduler_task_init (GstThreadSchedulerTask * object);
92
93 static gboolean gst_thread_scheduler_task_start (GstTask * task);
94 static gboolean gst_thread_scheduler_task_stop (GstTask * task);
95 static gboolean gst_thread_scheduler_task_pause (GstTask * task);
96
97 GType
98 gst_thread_scheduler_task_get_type (void)
99 {
100   static GType object_type = 0;
101
102   if (object_type == 0) {
103     static const GTypeInfo object_info = {
104       sizeof (GstThreadSchedulerTaskClass),
105       NULL,
106       NULL,
107       gst_thread_scheduler_task_class_init,
108       NULL,
109       NULL,
110       sizeof (GstThreadSchedulerTask),
111       0,
112       (GInstanceInitFunc) gst_thread_scheduler_task_init
113     };
114
115     object_type =
116         g_type_register_static (GST_TYPE_TASK,
117         "GstThreadSchedulerTask", &object_info, 0);
118   }
119   return object_type;
120 }
121
122 static void
123 gst_thread_scheduler_task_class_init (gpointer klass, gpointer class_data)
124 {
125   GstTaskClass *task = GST_TASK_CLASS (klass);
126
127   task->start = gst_thread_scheduler_task_start;
128   task->stop = gst_thread_scheduler_task_stop;
129   task->pause = gst_thread_scheduler_task_pause;
130 }
131
132 static void
133 gst_thread_scheduler_task_init (GstThreadSchedulerTask * task)
134 {
135   GST_TASK (task)->state = GST_TASK_STOPPED;
136 }
137
138 static gboolean
139 gst_thread_scheduler_task_start (GstTask * task)
140 {
141   GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task);
142   GstThreadScheduler *tsched =
143       GST_THREAD_SCHEDULER (GST_OBJECT_PARENT (GST_OBJECT (task)));
144   GstTaskState old;
145   GStaticRecMutex *lock;
146
147   GST_DEBUG_OBJECT (task, "Starting task %p", task);
148
149   if ((lock = GST_TASK_GET_LOCK (task)) == NULL) {
150     lock = g_new (GStaticRecMutex, 1);
151     g_static_rec_mutex_init (lock);
152     GST_TASK_GET_LOCK (task) = lock;
153   }
154
155   GST_LOCK (ttask);
156   old = GST_TASK_CAST (ttask)->state;
157   GST_TASK_CAST (ttask)->state = GST_TASK_STARTED;
158   switch (old) {
159     case GST_TASK_STOPPED:
160       gst_object_ref (task);
161       g_thread_pool_push (tsched->pool, task, NULL);
162       break;
163     case GST_TASK_PAUSED:
164       GST_TASK_SIGNAL (ttask);
165       break;
166     case GST_TASK_STARTED:
167       break;
168   }
169   GST_UNLOCK (ttask);
170
171   return TRUE;
172 }
173
174 static gboolean
175 gst_thread_scheduler_task_stop (GstTask * task)
176 {
177   GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task);
178   GstTaskState old;
179
180   GST_DEBUG_OBJECT (task, "Stopping task %p", task);
181
182   GST_LOCK (ttask);
183   old = GST_TASK_CAST (ttask)->state;
184   GST_TASK_CAST (ttask)->state = GST_TASK_STOPPED;
185   switch (old) {
186     case GST_TASK_STOPPED:
187       break;
188     case GST_TASK_PAUSED:
189       GST_TASK_SIGNAL (ttask);
190       break;
191     case GST_TASK_STARTED:
192       break;
193   }
194   GST_UNLOCK (ttask);
195
196   return TRUE;
197 }
198
199 static gboolean
200 gst_thread_scheduler_task_pause (GstTask * task)
201 {
202   GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task);
203   GstThreadScheduler *tsched =
204       GST_THREAD_SCHEDULER (GST_OBJECT_PARENT (GST_OBJECT (task)));
205   GstTaskState old;
206
207   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
208
209   GST_LOCK (ttask);
210   old = GST_TASK_CAST (ttask)->state;
211   GST_TASK_CAST (ttask)->state = GST_TASK_PAUSED;
212   switch (old) {
213     case GST_TASK_STOPPED:
214       gst_object_ref (task);
215       g_thread_pool_push (tsched->pool, task, NULL);
216       break;
217     case GST_TASK_PAUSED:
218       break;
219     case GST_TASK_STARTED:
220       break;
221   }
222   GST_UNLOCK (ttask);
223
224   return TRUE;
225 }
226
227 static void gst_thread_scheduler_class_init (gpointer g_class, gpointer data);
228 static void gst_thread_scheduler_init (GstThreadScheduler * object);
229
230 GType
231 gst_thread_scheduler_get_type (void)
232 {
233   static GType object_type = 0;
234
235   if (object_type == 0) {
236     static const GTypeInfo object_info = {
237       sizeof (GstThreadSchedulerClass),
238       NULL,
239       NULL,
240       gst_thread_scheduler_class_init,
241       NULL,
242       NULL,
243       sizeof (GstThreadScheduler),
244       0,
245       (GInstanceInitFunc) gst_thread_scheduler_init
246     };
247
248     object_type =
249         g_type_register_static (GST_TYPE_SCHEDULER,
250         "GstThreadScheduler", &object_info, 0);
251   }
252   return object_type;
253 }
254
255 static void gst_thread_scheduler_setup (GstScheduler * sched);
256 static void gst_thread_scheduler_reset (GstScheduler * sched);
257 static GstTask *gst_thread_scheduler_create_task (GstScheduler * sched,
258     GstTaskFunction func, gpointer data);
259
260 static void
261 gst_thread_scheduler_class_init (gpointer klass, gpointer class_data)
262 {
263   GstSchedulerClass *scheduler = GST_SCHEDULER_CLASS (klass);
264
265   scheduler->setup = gst_thread_scheduler_setup;
266   scheduler->reset = gst_thread_scheduler_reset;
267   scheduler->create_task = gst_thread_scheduler_create_task;
268 }
269
270 static void
271 gst_thread_scheduler_func (GstThreadSchedulerTask * ttask,
272     GstThreadScheduler * sched)
273 {
274   GstTask *task = GST_TASK (ttask);
275
276   GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task,
277       g_thread_self ());
278
279   /* locking order is TASK_LOCK, LOCK */
280   GST_TASK_LOCK (task);
281   GST_LOCK (task);
282   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
283     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
284       GST_TASK_UNLOCK (task);
285       GST_TASK_SIGNAL (task);
286       GST_TASK_WAIT (task);
287       GST_UNLOCK (task);
288       /* locking order.. */
289       GST_TASK_LOCK (task);
290       GST_LOCK (task);
291       if (task->state == GST_TASK_STOPPED)
292         goto done;
293     }
294     GST_UNLOCK (task);
295
296     task->func (task->data);
297
298     GST_LOCK (task);
299   }
300 done:
301   GST_UNLOCK (task);
302   GST_TASK_UNLOCK (task);
303
304   GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ());
305
306   gst_object_unref (task);
307 }
308
309 static void
310 gst_thread_scheduler_init (GstThreadScheduler * scheduler)
311 {
312   scheduler->pool = g_thread_pool_new (
313       (GFunc) gst_thread_scheduler_func, scheduler, -1, FALSE, NULL);
314 }
315
316 static GstTask *
317 gst_thread_scheduler_create_task (GstScheduler * sched, GstTaskFunction func,
318     gpointer data)
319 {
320   GstThreadSchedulerTask *task;
321
322   task =
323       GST_THREAD_SCHEDULER_TASK (g_object_new (GST_TYPE_THREAD_SCHEDULER_TASK,
324           NULL));
325   gst_object_set_parent (GST_OBJECT (task), GST_OBJECT (sched));
326   GST_TASK_CAST (task)->func = func;
327   GST_TASK_CAST (task)->data = data;
328
329   GST_DEBUG_OBJECT (sched, "Created task %p", task);
330
331   return GST_TASK_CAST (task);
332 }
333
334 static void
335 gst_thread_scheduler_setup (GstScheduler * sched)
336 {
337 }
338
339 static void
340 gst_thread_scheduler_reset (GstScheduler * sched)
341 {
342 }
343
344 static gboolean
345 plugin_init (GstPlugin * plugin)
346 {
347   GstSchedulerFactory *factory;
348
349   GST_DEBUG_CATEGORY_INIT (debug_scheduler, "thread", 0, "thread scheduler");
350
351   factory = gst_scheduler_factory_new ("thread",
352       "A scheduler using threads", GST_TYPE_THREAD_SCHEDULER);
353   if (factory == NULL)
354     return FALSE;
355
356   gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
357   return TRUE;
358 }
359
360 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, "gstthreadscheduler",
361     "a thread scheduler", plugin_init, VERSION, GST_LICENSE, GST_PACKAGE,
362     GST_ORIGIN)