tests/check/: use the new macro
[platform/upstream/gstreamer.git] / gst / gsttask.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gsttask.c: Streaming tasks
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gsttask
25  * @short_description: Abstraction of GStreamer streaming threads.
26  * @see_also: #GstElement, #GstPad
27  *
28  * #GstTask is used by #GstElement and #GstPad to provide the data passing
29  * threads in a #GstPipeline.
30  *
31  * A #GstPad will typically start a #GstTask to push or pull data to/from the
32  * peer pads. Most source elements start a #GstTask to push data. In some cases
33  * a demuxer element can start a #GstTask to pull data from a peer element. This
34  * is typically done when the demuxer can perform random access on the upstream
35  * peer element for improved performance.
36  *
37  * Although convenience functions exist on #GstPad to start/pause/stop tasks, it 
38  * might sometimes be needed to create a #GstTask manually if it is not related to
39  * a #GstPad.
40  *
41  * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with
42  * gst_task_set_lock().
43  *
44  * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
45  * and gst_task_stop() respectively.
46  *
47  * A #GstTask will repeadedly call the #GstTaskFunction with the user data
48  * that was provided when creating the task with gst_task_create(). Before calling
49  * the function it will acquire the provided lock.
50  *
51  * Stopping a task with gst_task_stop() will not immediatly make sure the task is
52  * not running anymore. Use gst_task_join() to make sure the task is completely
53  * stopped and the thread is stopped.
54  *
55  * After creating a #GstTask, use gst_object_unref() to free its resources. This can
56  * only be done it the task is not running anymore.
57  *
58  * Last reviewed on 2006-02-13 (0.10.4)
59  */
60
61 #include "gst_private.h"
62
63 #include "gstinfo.h"
64 #include "gsttask.h"
65
66 GST_DEBUG_CATEGORY_STATIC (task_debug);
67 #define GST_CAT_DEFAULT (task_debug)
68
69 static void gst_task_class_init (GstTaskClass * klass);
70 static void gst_task_init (GstTask * task);
71 static void gst_task_finalize (GObject * object);
72
73 static void gst_task_func (GstTask * task, GstTaskClass * tclass);
74
75 static GstObjectClass *parent_class = NULL;
76
77 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
78
79 GType
80 gst_task_get_type (void)
81 {
82   static GType _gst_task_type = 0;
83
84   if (G_UNLIKELY (_gst_task_type == 0)) {
85     static const GTypeInfo task_info = {
86       sizeof (GstTaskClass),
87       NULL,
88       NULL,
89       (GClassInitFunc) gst_task_class_init,
90       NULL,
91       NULL,
92       sizeof (GstTask),
93       0,
94       (GInstanceInitFunc) gst_task_init,
95       NULL
96     };
97
98     _gst_task_type =
99         g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
100
101     GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
102   }
103   return _gst_task_type;
104 }
105
106 static void
107 gst_task_class_init (GstTaskClass * klass)
108 {
109   GObjectClass *gobject_class;
110
111   gobject_class = (GObjectClass *) klass;
112
113   parent_class = g_type_class_peek_parent (klass);
114
115   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
116
117   klass->pool = g_thread_pool_new (
118       (GFunc) gst_task_func, klass, -1, FALSE, NULL);
119 }
120
121 static void
122 gst_task_init (GstTask * task)
123 {
124   task->running = FALSE;
125   task->abidata.ABI.thread = NULL;
126   task->lock = NULL;
127   task->cond = g_cond_new ();
128   task->state = GST_TASK_STOPPED;
129 }
130
131 static void
132 gst_task_finalize (GObject * object)
133 {
134   GstTask *task = GST_TASK (object);
135
136   GST_DEBUG ("task %p finalize", task);
137
138   /* task thread cannot be running here since it holds a ref
139    * to the task so that the finalize could not have happened */
140   g_cond_free (task->cond);
141   task->cond = NULL;
142
143   G_OBJECT_CLASS (parent_class)->finalize (object);
144 }
145
146 static void
147 gst_task_func (GstTask * task, GstTaskClass * tclass)
148 {
149   GStaticRecMutex *lock;
150   GThread *tself;
151
152   tself = g_thread_self ();
153
154   GST_DEBUG ("Entering task %p, thread %p", task, tself);
155
156   /* we have to grab the lock to get the mutex. We also
157    * mark our state running so that nobody can mess with
158    * the mutex. */
159   GST_OBJECT_LOCK (task);
160   if (task->state == GST_TASK_STOPPED)
161     goto exit;
162   lock = GST_TASK_GET_LOCK (task);
163   if (G_UNLIKELY (lock == NULL))
164     goto no_lock;
165   task->running = TRUE;
166   task->abidata.ABI.thread = tself;
167   GST_OBJECT_UNLOCK (task);
168
169   /* locking order is TASK_LOCK, LOCK */
170   g_static_rec_mutex_lock (lock);
171   GST_OBJECT_LOCK (task);
172   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
173     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
174       gint t;
175
176       t = g_static_rec_mutex_unlock_full (lock);
177       if (t <= 0) {
178         g_warning ("wrong STREAM_LOCK count %d", t);
179       }
180       GST_TASK_SIGNAL (task);
181       GST_TASK_WAIT (task);
182       GST_OBJECT_UNLOCK (task);
183       /* locking order.. */
184       if (t > 0)
185         g_static_rec_mutex_lock_full (lock, t);
186
187       GST_OBJECT_LOCK (task);
188       if (G_UNLIKELY (task->state == GST_TASK_STOPPED))
189         goto done;
190     }
191     GST_OBJECT_UNLOCK (task);
192
193     task->func (task->data);
194
195     GST_OBJECT_LOCK (task);
196   }
197 done:
198   GST_OBJECT_UNLOCK (task);
199   g_static_rec_mutex_unlock (lock);
200
201   /* now we allow messing with the lock again */
202   GST_OBJECT_LOCK (task);
203   task->running = FALSE;
204   task->abidata.ABI.thread = NULL;
205 exit:
206   GST_TASK_SIGNAL (task);
207   GST_OBJECT_UNLOCK (task);
208
209   GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
210
211   gst_object_unref (task);
212   return;
213
214 no_lock:
215   {
216     g_warning ("starting task without a lock");
217     goto exit;
218   }
219 }
220
221 /**
222  * gst_task_cleanup_all:
223  *
224  * Wait for all tasks to be stopped. This is mainly used internally
225  * to ensure proper cleanup of internal datastructures in testsuites.
226  *
227  * MT safe.
228  */
229 void
230 gst_task_cleanup_all (void)
231 {
232   GstTaskClass *klass;
233
234   if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
235     g_static_mutex_lock (&pool_lock);
236     if (klass->pool) {
237       /* Shut down all the threads, we still process the ones scheduled
238        * because the unref happens in the thread function.
239        * Also wait for currently running ones to finish. */
240       g_thread_pool_free (klass->pool, FALSE, TRUE);
241       /* create new pool, so we can still do something after this
242        * call. */
243       klass->pool = g_thread_pool_new (
244           (GFunc) gst_task_func, klass, -1, FALSE, NULL);
245     }
246     g_static_mutex_unlock (&pool_lock);
247   }
248 }
249
250 /**
251  * gst_task_create:
252  * @func: The #GstTaskFunction to use
253  * @data: User data to pass to @func
254  *
255  * Create a new Task that will repeadedly call the provided @func
256  * with @data as a parameter. Typically the task will run in
257  * a new thread.
258  *
259  * Returns: A new #GstTask.
260  *
261  * MT safe.
262  */
263 GstTask *
264 gst_task_create (GstTaskFunction func, gpointer data)
265 {
266   GstTask *task;
267
268   task = g_object_new (GST_TYPE_TASK, NULL);
269   task->func = func;
270   task->data = data;
271
272   GST_DEBUG ("Created task %p", task);
273
274   return task;
275 }
276
277 /**
278  * gst_task_set_lock:
279  * @task: The #GstTask to use
280  * @mutex: The GMutex to use
281  *
282  * Set the mutex used by the task. The mutex will be acquired before
283  * calling the #GstTaskFunction.
284  *
285  * This function has to be called before calling gst_task_pause() or
286  * gst_task_start().
287  *
288  * MT safe.
289  */
290 void
291 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
292 {
293   GST_OBJECT_LOCK (task);
294   if (task->running)
295     goto is_running;
296   GST_TASK_GET_LOCK (task) = mutex;
297   GST_OBJECT_UNLOCK (task);
298
299   return;
300
301   /* ERRORS */
302 is_running:
303   {
304     GST_OBJECT_UNLOCK (task);
305     g_warning ("cannot call set_lock on a running task");
306   }
307 }
308
309
310 /**
311  * gst_task_get_state:
312  * @task: The #GstTask to query
313  *
314  * Get the current state of the task.
315  *
316  * Returns: The #GstTaskState of the task
317  *
318  * MT safe.
319  */
320 GstTaskState
321 gst_task_get_state (GstTask * task)
322 {
323   GstTaskState result;
324
325   g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
326
327   GST_OBJECT_LOCK (task);
328   result = task->state;
329   GST_OBJECT_UNLOCK (task);
330
331   return result;
332 }
333
334 /**
335  * gst_task_start:
336  * @task: The #GstTask to start
337  *
338  * Starts @task. The @task must have a lock associated with it using
339  * gst_task_set_lock() or thsi function will return FALSE.
340  *
341  * Returns: TRUE if the task could be started.
342  *
343  * MT safe.
344  */
345 gboolean
346 gst_task_start (GstTask * task)
347 {
348   GstTaskState old;
349
350   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
351
352   GST_DEBUG_OBJECT (task, "Starting task %p", task);
353
354   GST_OBJECT_LOCK (task);
355   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
356     goto no_lock;
357
358   old = task->state;
359   task->state = GST_TASK_STARTED;
360   switch (old) {
361     case GST_TASK_STOPPED:
362     {
363       GstTaskClass *tclass;
364
365       tclass = GST_TASK_GET_CLASS (task);
366
367       /* new task, push on threadpool. We ref before so
368        * that it remains alive while on the threadpool. */
369       gst_object_ref (task);
370       g_static_mutex_lock (&pool_lock);
371       g_thread_pool_push (tclass->pool, task, NULL);
372       g_static_mutex_unlock (&pool_lock);
373       break;
374     }
375     case GST_TASK_PAUSED:
376       /* PAUSE to PLAY, signal */
377       GST_TASK_SIGNAL (task);
378       break;
379     case GST_TASK_STARTED:
380       /* was OK */
381       break;
382   }
383   GST_OBJECT_UNLOCK (task);
384
385   return TRUE;
386
387   /* ERRORS */
388 no_lock:
389   {
390     GST_WARNING_OBJECT (task, "starting task without a lock");
391     GST_OBJECT_UNLOCK (task);
392     g_warning ("starting task without a lock");
393     return FALSE;
394   }
395 }
396
397 /**
398  * gst_task_stop:
399  * @task: The #GstTask to stop
400  *
401  * Stops @task. This method merely schedules the task to stop and
402  * will not wait for the task to have completely stopped. Use
403  * gst_task_join() to stop and wait for completion.
404  *
405  * Returns: TRUE if the task could be stopped.
406  *
407  * MT safe.
408  */
409 gboolean
410 gst_task_stop (GstTask * task)
411 {
412   GstTaskClass *tclass;
413   GstTaskState old;
414
415   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
416
417   tclass = GST_TASK_GET_CLASS (task);
418
419   GST_DEBUG_OBJECT (task, "Stopping task %p", task);
420
421   GST_OBJECT_LOCK (task);
422   old = task->state;
423   task->state = GST_TASK_STOPPED;
424   switch (old) {
425     case GST_TASK_STOPPED:
426       break;
427     case GST_TASK_PAUSED:
428       GST_TASK_SIGNAL (task);
429       break;
430     case GST_TASK_STARTED:
431       break;
432   }
433   GST_OBJECT_UNLOCK (task);
434
435   return TRUE;
436 }
437
438 /**
439  * gst_task_pause:
440  * @task: The #GstTask to pause
441  *
442  * Pauses @task. This method can also be called on a task in the
443  * stopped state, in which case a thread will be started and will remain
444  * in the paused state. This function does not wait for the task to complete
445  * the paused state.
446  *
447  * Returns: TRUE if the task could be paused.
448  *
449  * MT safe.
450  */
451 gboolean
452 gst_task_pause (GstTask * task)
453 {
454   GstTaskState old;
455
456   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
457
458   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
459
460   GST_OBJECT_LOCK (task);
461   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
462     goto no_lock;
463
464   old = task->state;
465   task->state = GST_TASK_PAUSED;
466   switch (old) {
467     case GST_TASK_STOPPED:
468     {
469       GstTaskClass *tclass;
470
471       tclass = GST_TASK_GET_CLASS (task);
472
473       gst_object_ref (task);
474       g_static_mutex_lock (&pool_lock);
475       g_thread_pool_push (tclass->pool, task, NULL);
476       g_static_mutex_unlock (&pool_lock);
477       break;
478     }
479     case GST_TASK_PAUSED:
480       break;
481     case GST_TASK_STARTED:
482       break;
483   }
484   GST_OBJECT_UNLOCK (task);
485
486   return TRUE;
487
488   /* ERRORS */
489 no_lock:
490   {
491     GST_WARNING_OBJECT (task, "pausing task without a lock");
492     GST_OBJECT_UNLOCK (task);
493     g_warning ("pausing task without a lock");
494     return FALSE;
495   }
496 }
497
498 /**
499  * gst_task_join:
500  * @task: The #GstTask to join
501  *
502  * Joins @task. After this call, it is safe to unref the task
503  * and clean up the lock set with gst_task_set_lock().
504  *
505  * The task will automatically be stopped with this call.
506  *
507  * This function cannot be called from within a task function as this
508  * will cause a deadlock.
509  *
510  * Returns: TRUE if the task could be joined.
511  *
512  * MT safe.
513  */
514 gboolean
515 gst_task_join (GstTask * task)
516 {
517   GThread *tself;
518
519   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
520
521   tself = g_thread_self ();
522
523   GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
524
525   GST_OBJECT_LOCK (task);
526   if (tself == task->abidata.ABI.thread)
527     goto joining_self;
528   task->state = GST_TASK_STOPPED;
529   GST_TASK_SIGNAL (task);
530   while (task->running)
531     GST_TASK_WAIT (task);
532   GST_OBJECT_UNLOCK (task);
533
534   GST_DEBUG_OBJECT (task, "Joined task %p", task);
535
536   return TRUE;
537
538   /* ERRORS */
539 joining_self:
540   {
541     GST_WARNING_OBJECT (task, "trying to join task from its thread");
542     GST_OBJECT_UNLOCK (task);
543     g_warning ("trying to join task %p from its thread would deadlock", task);
544     return FALSE;
545   }
546 }