don't break docs build
[platform/upstream/gstreamer.git] / gst / gsttask.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gsttask.c: Streaming tasks
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #include "gst_private.h"
24
25 #include "gstinfo.h"
26 #include "gsttask.h"
27
28 GST_DEBUG_CATEGORY (task_debug);
29 #define GST_CAT_DEFAULT (task_debug)
30
31 static void gst_task_class_init (GstTaskClass * klass);
32 static void gst_task_init (GstTask * task);
33 static void gst_task_finalize (GObject * object);
34
35 static void gst_task_func (GstTask * task, GstTaskClass * tclass);
36
37 static GstObjectClass *parent_class = NULL;
38
39 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
40
41 GType
42 gst_task_get_type (void)
43 {
44   static GType _gst_task_type = 0;
45
46   if (!_gst_task_type) {
47     static const GTypeInfo task_info = {
48       sizeof (GstTaskClass),
49       NULL,
50       NULL,
51       (GClassInitFunc) gst_task_class_init,
52       NULL,
53       NULL,
54       sizeof (GstTask),
55       0,
56       (GInstanceInitFunc) gst_task_init,
57       NULL
58     };
59
60     _gst_task_type =
61         g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
62
63     GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
64   }
65   return _gst_task_type;
66 }
67
68 static void
69 gst_task_class_init (GstTaskClass * klass)
70 {
71   GObjectClass *gobject_class;
72
73   gobject_class = (GObjectClass *) klass;
74
75   parent_class = g_type_class_ref (GST_TYPE_OBJECT);
76
77   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
78
79   klass->pool = g_thread_pool_new (
80       (GFunc) gst_task_func, klass, -1, FALSE, NULL);
81 }
82
83 static void
84 gst_task_init (GstTask * task)
85 {
86   task->running = FALSE;
87   task->lock = NULL;
88   task->cond = g_cond_new ();
89   task->state = GST_TASK_STOPPED;
90 }
91
92 static void
93 gst_task_finalize (GObject * object)
94 {
95   GstTask *task = GST_TASK (object);
96
97   GST_DEBUG ("task %p finalize", task);
98
99   g_cond_free (task->cond);
100   task->cond = NULL;
101
102   G_OBJECT_CLASS (parent_class)->finalize (object);
103 }
104
105 static void
106 gst_task_func (GstTask * task, GstTaskClass * tclass)
107 {
108   GStaticRecMutex *lock;
109
110   GST_DEBUG ("Entering task %p, thread %p", task, g_thread_self ());
111
112   /* we have to grab the lock to get the mutex. We also
113    * mark our state running so that nobody can mess with
114    * the mutex. */
115   GST_LOCK (task);
116   if (task->state == GST_TASK_STOPPED)
117     goto exit;
118   lock = GST_TASK_GET_LOCK (task);
119   task->running = TRUE;
120   GST_UNLOCK (task);
121
122   /* locking order is TASK_LOCK, LOCK */
123   g_static_rec_mutex_lock (lock);
124   GST_LOCK (task);
125   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
126     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
127       gint t;
128
129       t = g_static_rec_mutex_unlock_full (lock);
130       if (t <= 0) {
131         g_warning ("wrong STREAM_LOCK count %d", t);
132       }
133       GST_TASK_SIGNAL (task);
134       GST_TASK_WAIT (task);
135       GST_UNLOCK (task);
136       /* locking order.. */
137       if (t > 0)
138         g_static_rec_mutex_lock_full (lock, t);
139
140       GST_LOCK (task);
141       if (task->state == GST_TASK_STOPPED)
142         goto done;
143     }
144     GST_UNLOCK (task);
145
146     task->func (task->data);
147
148     GST_LOCK (task);
149   }
150 done:
151   GST_UNLOCK (task);
152   g_static_rec_mutex_unlock (lock);
153
154   /* now we allow messing with the lock again */
155   GST_LOCK (task);
156   task->running = FALSE;
157 exit:
158   GST_TASK_SIGNAL (task);
159   GST_UNLOCK (task);
160
161   GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
162
163   gst_object_unref (task);
164 }
165
166 /**
167  * gst_task_cleanup_all:
168  *
169  * Wait for all tasks to be stopped. This is mainly used internally
170  * to ensure proper cleanup of internal datastructures in testsuites.
171  *
172  * MT safe.
173  */
174 void
175 gst_task_cleanup_all (void)
176 {
177   GstTaskClass *klass;
178
179   if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
180     g_static_mutex_lock (&pool_lock);
181     if (klass->pool) {
182       /* Shut down all the threads, we still process the ones scheduled
183        * because the unref happens in the thread function.
184        * Also wait for currently running ones to finish. */
185       g_thread_pool_free (klass->pool, FALSE, TRUE);
186       /* create new pool, so we can still do something after this
187        * call. */
188       klass->pool = g_thread_pool_new (
189           (GFunc) gst_task_func, klass, -1, FALSE, NULL);
190     }
191     g_static_mutex_unlock (&pool_lock);
192   }
193 }
194
195 /**
196  * gst_task_create:
197  * @func: The #GstTaskFunction to use
198  * @data: User data to pass to @func
199  *
200  * Create a new Task that will repeadedly call the provided @func
201  * with @data as a parameter. Typically the task will run in
202  * a new thread.
203  *
204  * Returns: A new #GstTask.
205  *
206  * MT safe.
207  */
208 GstTask *
209 gst_task_create (GstTaskFunction func, gpointer data)
210 {
211   GstTask *task;
212
213   task = g_object_new (GST_TYPE_TASK, NULL);
214   task->func = func;
215   task->data = data;
216
217   GST_DEBUG ("Created task %p", task);
218
219   return task;
220 }
221
222 /**
223  * gst_task_set_lock:
224  * @task: The #GstTask to use
225  * @mutex: The GMutex to use
226  *
227  * Set the mutex used by the task.
228  *
229  * MT safe.
230  */
231 void
232 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
233 {
234   GST_LOCK (task);
235   if (task->running)
236     goto is_running;
237   GST_TASK_GET_LOCK (task) = mutex;
238   GST_UNLOCK (task);
239
240   return;
241
242   /* ERRORS */
243 is_running:
244   {
245     g_warning ("cannot call set_lock on a running task");
246     GST_UNLOCK (task);
247   }
248 }
249
250
251 /**
252  * gst_task_get_state:
253  * @task: The #GstTask to query
254  *
255  * Get the current state of the task.
256  *
257  * Returns: The #GstTaskState of the task
258  *
259  * MT safe.
260  */
261 GstTaskState
262 gst_task_get_state (GstTask * task)
263 {
264   GstTaskState result;
265
266   g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
267
268   GST_LOCK (task);
269   result = task->state;
270   GST_UNLOCK (task);
271
272   return result;
273 }
274
275 /**
276  * gst_task_start:
277  * @task: The #GstTask to start
278  *
279  * Starts @task.
280  *
281  * Returns: TRUE if the task could be started.
282  *
283  * MT safe.
284  */
285 gboolean
286 gst_task_start (GstTask * task)
287 {
288   GstTaskState old;
289
290   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
291
292   GST_DEBUG_OBJECT (task, "Starting task %p", task);
293
294   GST_LOCK (task);
295   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
296     goto no_lock;
297
298   old = task->state;
299   task->state = GST_TASK_STARTED;
300   switch (old) {
301     case GST_TASK_STOPPED:
302     {
303       GstTaskClass *tclass;
304
305       tclass = GST_TASK_GET_CLASS (task);
306
307       /* new task, push on threadpool. We ref before so
308        * that it remains alive while on the threadpool. */
309       gst_object_ref (task);
310       g_static_mutex_lock (&pool_lock);
311       g_thread_pool_push (tclass->pool, task, NULL);
312       g_static_mutex_unlock (&pool_lock);
313       break;
314     }
315     case GST_TASK_PAUSED:
316       /* PAUSE to PLAY, signal */
317       GST_TASK_SIGNAL (task);
318       break;
319     case GST_TASK_STARTED:
320       /* was OK */
321       break;
322   }
323   GST_UNLOCK (task);
324
325   return TRUE;
326
327   /* ERRORS */
328 no_lock:
329   {
330     g_warning ("starting task without a lock");
331     return FALSE;
332   }
333 }
334
335 /**
336  * gst_task_stop:
337  * @task: The #GstTask to stop
338  *
339  * Stops @task.
340  *
341  * Returns: TRUE if the task could be stopped.
342  *
343  * MT safe.
344  */
345 gboolean
346 gst_task_stop (GstTask * task)
347 {
348   GstTaskClass *tclass;
349   GstTaskState old;
350
351   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
352
353   tclass = GST_TASK_GET_CLASS (task);
354
355   GST_DEBUG_OBJECT (task, "Stopping task %p", task);
356
357   GST_LOCK (task);
358   old = task->state;
359   task->state = GST_TASK_STOPPED;
360   switch (old) {
361     case GST_TASK_STOPPED:
362       break;
363     case GST_TASK_PAUSED:
364       GST_TASK_SIGNAL (task);
365       break;
366     case GST_TASK_STARTED:
367       break;
368   }
369   GST_UNLOCK (task);
370
371   return TRUE;
372 }
373
374 /**
375  * gst_task_pause:
376  * @task: The #GstTask to pause
377  *
378  * Pauses @task.
379  *
380  * Returns: TRUE if the task could be paused.
381  *
382  * MT safe.
383  */
384 gboolean
385 gst_task_pause (GstTask * task)
386 {
387   GstTaskState old;
388
389   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
390
391   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
392
393   GST_LOCK (task);
394   old = task->state;
395   task->state = GST_TASK_PAUSED;
396   switch (old) {
397     case GST_TASK_STOPPED:
398     {
399       GstTaskClass *tclass;
400
401       tclass = GST_TASK_GET_CLASS (task);
402
403       gst_object_ref (task);
404       g_static_mutex_lock (&pool_lock);
405       g_thread_pool_push (tclass->pool, task, NULL);
406       g_static_mutex_unlock (&pool_lock);
407       break;
408     }
409     case GST_TASK_PAUSED:
410       break;
411     case GST_TASK_STARTED:
412       break;
413   }
414   GST_UNLOCK (task);
415
416   return TRUE;
417 }
418
419 /**
420  * gst_task_join:
421  * @task: The #GstTask to join
422  *
423  * Joins @task. After this call, it is safe to unref the task
424  * and clean up the lock set with #gst_task_set_lock().
425  *
426  * The task will automatically be stopped with this call.
427  *
428  * This function cannot be called from within a task function.
429  *
430  * Returns: TRUE if the task could be joined.
431  *
432  * MT safe.
433  */
434 gboolean
435 gst_task_join (GstTask * task)
436 {
437   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
438
439   GST_DEBUG_OBJECT (task, "Joining task %p", task);
440
441   GST_LOCK (task);
442   task->state = GST_TASK_STOPPED;
443   GST_TASK_SIGNAL (task);
444   while (task->running)
445     GST_TASK_WAIT (task);
446   GST_UNLOCK (task);
447
448   return TRUE;
449 }