GstTask: unify task state functions
[platform/upstream/gstreamer.git] / gst / gsttask.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gsttask.c: Streaming tasks
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gsttask
25  * @short_description: Abstraction of GStreamer streaming threads.
26  * @see_also: #GstElement, #GstPad
27  *
28  * #GstTask is used by #GstElement and #GstPad to provide the data passing
29  * threads in a #GstPipeline.
30  *
31  * A #GstPad will typically start a #GstTask to push or pull data to/from the
32  * peer pads. Most source elements start a #GstTask to push data. In some cases
33  * a demuxer element can start a #GstTask to pull data from a peer element. This
34  * is typically done when the demuxer can perform random access on the upstream
35  * peer element for improved performance.
36  *
37  * Although convenience functions exist on #GstPad to start/pause/stop tasks, it 
38  * might sometimes be needed to create a #GstTask manually if it is not related to
39  * a #GstPad.
40  *
41  * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with
42  * gst_task_set_lock().
43  *
44  * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
45  * and gst_task_stop() respectively.
46  *
47  * A #GstTask will repeatedly call the #GstTaskFunction with the user data
48  * that was provided when creating the task with gst_task_create(). Before calling
49  * the function it will acquire the provided lock.
50  *
51  * Stopping a task with gst_task_stop() will not immediately make sure the task is
52  * not running anymore. Use gst_task_join() to make sure the task is completely
53  * stopped and the thread is stopped.
54  *
55  * After creating a #GstTask, use gst_object_unref() to free its resources. This can
56  * only be done it the task is not running anymore.
57  *
58  * Last reviewed on 2006-02-13 (0.10.4)
59  */
60
61 #include "gst_private.h"
62
63 #include "gstinfo.h"
64 #include "gsttask.h"
65
66 GST_DEBUG_CATEGORY_STATIC (task_debug);
67 #define GST_CAT_DEFAULT (task_debug)
68
69 static void gst_task_class_init (GstTaskClass * klass);
70 static void gst_task_init (GstTask * task);
71 static void gst_task_finalize (GObject * object);
72
73 static void gst_task_func (GstTask * task, GstTaskClass * tclass);
74
75 static GstObjectClass *parent_class = NULL;
76
77 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
78
79 #define _do_init \
80 { \
81   GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \
82 }
83
84 G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init);
85
86 static void
87 gst_task_class_init (GstTaskClass * klass)
88 {
89   GObjectClass *gobject_class;
90
91   gobject_class = (GObjectClass *) klass;
92
93   parent_class = g_type_class_peek_parent (klass);
94
95   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
96
97   klass->pool = g_thread_pool_new (
98       (GFunc) gst_task_func, klass, -1, FALSE, NULL);
99 }
100
101 static void
102 gst_task_init (GstTask * task)
103 {
104   task->running = FALSE;
105   task->abidata.ABI.thread = NULL;
106   task->lock = NULL;
107   task->cond = g_cond_new ();
108   task->state = GST_TASK_STOPPED;
109 }
110
111 static void
112 gst_task_finalize (GObject * object)
113 {
114   GstTask *task = GST_TASK (object);
115
116   GST_DEBUG ("task %p finalize", task);
117
118   /* task thread cannot be running here since it holds a ref
119    * to the task so that the finalize could not have happened */
120   g_cond_free (task->cond);
121   task->cond = NULL;
122
123   G_OBJECT_CLASS (parent_class)->finalize (object);
124 }
125
126 static void
127 gst_task_func (GstTask * task, GstTaskClass * tclass)
128 {
129   GStaticRecMutex *lock;
130   GThread *tself;
131
132   tself = g_thread_self ();
133
134   GST_DEBUG ("Entering task %p, thread %p", task, tself);
135
136   /* we have to grab the lock to get the mutex. We also
137    * mark our state running so that nobody can mess with
138    * the mutex. */
139   GST_OBJECT_LOCK (task);
140   if (task->state == GST_TASK_STOPPED)
141     goto exit;
142   lock = GST_TASK_GET_LOCK (task);
143   if (G_UNLIKELY (lock == NULL))
144     goto no_lock;
145   task->abidata.ABI.thread = tself;
146   GST_OBJECT_UNLOCK (task);
147
148   /* locking order is TASK_LOCK, LOCK */
149   g_static_rec_mutex_lock (lock);
150   GST_OBJECT_LOCK (task);
151   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
152     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
153       gint t;
154
155       t = g_static_rec_mutex_unlock_full (lock);
156       if (t <= 0) {
157         g_warning ("wrong STREAM_LOCK count %d", t);
158       }
159       GST_TASK_SIGNAL (task);
160       GST_TASK_WAIT (task);
161       GST_OBJECT_UNLOCK (task);
162       /* locking order.. */
163       if (t > 0)
164         g_static_rec_mutex_lock_full (lock, t);
165
166       GST_OBJECT_LOCK (task);
167       if (G_UNLIKELY (task->state == GST_TASK_STOPPED))
168         goto done;
169     }
170     GST_OBJECT_UNLOCK (task);
171
172     task->func (task->data);
173
174     GST_OBJECT_LOCK (task);
175   }
176 done:
177   GST_OBJECT_UNLOCK (task);
178   g_static_rec_mutex_unlock (lock);
179
180   GST_OBJECT_LOCK (task);
181   task->abidata.ABI.thread = NULL;
182
183 exit:
184   /* now we allow messing with the lock again by setting the running flag to
185    * FALSE. Together with the SIGNAL this is the sign for the _join() to 
186    * complete. 
187    * Note that we still have not dropped the final ref on the task. We could
188    * check here if there is a pending join() going on and drop the last ref
189    * before releasing the lock as we can be sure that a ref is held by the
190    * caller of the join(). */
191   task->running = FALSE;
192   GST_TASK_SIGNAL (task);
193   GST_OBJECT_UNLOCK (task);
194
195   GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
196
197   gst_object_unref (task);
198   return;
199
200 no_lock:
201   {
202     g_warning ("starting task without a lock");
203     goto exit;
204   }
205 }
206
207 /**
208  * gst_task_cleanup_all:
209  *
210  * Wait for all tasks to be stopped. This is mainly used internally
211  * to ensure proper cleanup of internal data structures in test suites.
212  *
213  * MT safe.
214  */
215 void
216 gst_task_cleanup_all (void)
217 {
218   GstTaskClass *klass;
219
220   if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
221     g_static_mutex_lock (&pool_lock);
222     if (klass->pool) {
223       /* Shut down all the threads, we still process the ones scheduled
224        * because the unref happens in the thread function.
225        * Also wait for currently running ones to finish. */
226       g_thread_pool_free (klass->pool, FALSE, TRUE);
227       /* create new pool, so we can still do something after this
228        * call. */
229       klass->pool = g_thread_pool_new (
230           (GFunc) gst_task_func, klass, -1, FALSE, NULL);
231     }
232     g_static_mutex_unlock (&pool_lock);
233   }
234 }
235
236 /**
237  * gst_task_create:
238  * @func: The #GstTaskFunction to use
239  * @data: User data to pass to @func
240  *
241  * Create a new Task that will repeatedly call the provided @func
242  * with @data as a parameter. Typically the task will run in
243  * a new thread.
244  *
245  * The function cannot be changed after the task has been created. You
246  * must create a new #GstTask to change the function.
247  *
248  * Returns: A new #GstTask.
249  *
250  * MT safe.
251  */
252 GstTask *
253 gst_task_create (GstTaskFunction func, gpointer data)
254 {
255   GstTask *task;
256
257   task = g_object_new (GST_TYPE_TASK, NULL);
258   task->func = func;
259   task->data = data;
260
261   GST_DEBUG ("Created task %p", task);
262
263   return task;
264 }
265
266 /**
267  * gst_task_set_lock:
268  * @task: The #GstTask to use
269  * @mutex: The #GMutex to use
270  *
271  * Set the mutex used by the task. The mutex will be acquired before
272  * calling the #GstTaskFunction.
273  *
274  * This function has to be called before calling gst_task_pause() or
275  * gst_task_start().
276  *
277  * MT safe.
278  */
279 void
280 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
281 {
282   GST_OBJECT_LOCK (task);
283   if (G_UNLIKELY (task->running))
284     goto is_running;
285   GST_TASK_GET_LOCK (task) = mutex;
286   GST_OBJECT_UNLOCK (task);
287
288   return;
289
290   /* ERRORS */
291 is_running:
292   {
293     GST_OBJECT_UNLOCK (task);
294     g_warning ("cannot call set_lock on a running task");
295   }
296 }
297
298
299 /**
300  * gst_task_get_state:
301  * @task: The #GstTask to query
302  *
303  * Get the current state of the task.
304  *
305  * Returns: The #GstTaskState of the task
306  *
307  * MT safe.
308  */
309 GstTaskState
310 gst_task_get_state (GstTask * task)
311 {
312   GstTaskState result;
313
314   g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
315
316   GST_OBJECT_LOCK (task);
317   result = task->state;
318   GST_OBJECT_UNLOCK (task);
319
320   return result;
321 }
322
323 /* make sure the task is running and start a thread if it's not.
324  * This function must be called with the task LOCK. */
325 static gboolean
326 start_task (GstTask * task)
327 {
328   gboolean res = TRUE;
329   GstTaskClass *tclass;
330   GError *error = NULL;
331
332   /* new task, We ref before so that it remains alive while
333    * the thread is running. */
334   gst_object_ref (task);
335   /* mark task as running so that a join will wait until we schedule
336    * and exit the task function. */
337   task->running = TRUE;
338
339   tclass = GST_TASK_GET_CLASS (task);
340
341   /* push on the thread pool */
342   g_static_mutex_lock (&pool_lock);
343   g_thread_pool_push (tclass->pool, task, &error);
344   g_static_mutex_unlock (&pool_lock);
345
346   if (error != NULL) {
347     g_warning ("failed to create thread: %s", error->message);
348     g_error_free (error);
349     res = FALSE;
350   }
351   return res;
352 }
353
354
355 /**
356  * gst_task_set_state:
357  * @task: a #GstTask
358  * @state: the new task state
359  *
360  * Sets the state of @task to @state.
361  *
362  * The @task must have a lock associated with it using
363  * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or
364  * this function will return %FALSE.
365  *
366  * Returns: %TRUE if the state could be changed.
367  *
368  * Since: 0.10.24
369  *
370  * MT safe.
371  */
372 gboolean
373 gst_task_set_state (GstTask * task, GstTaskState state)
374 {
375   GstTaskState old;
376   gboolean res = TRUE;
377
378   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
379
380   GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state);
381
382   GST_OBJECT_LOCK (task);
383   if (state != GST_TASK_STOPPED)
384     if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
385       goto no_lock;
386
387   /* if the state changed, do our thing */
388   old = task->state;
389   if (old != state) {
390     task->state = state;
391     switch (old) {
392       case GST_TASK_STOPPED:
393         /* If the task already has a thread scheduled we don't have to do
394          * anything. */
395         if (G_UNLIKELY (!task->running))
396           res = start_task (task);
397         break;
398       case GST_TASK_PAUSED:
399         /* when we are paused, signal to go to the new state */
400         GST_TASK_SIGNAL (task);
401         break;
402       case GST_TASK_STARTED:
403         /* if we were started, we'll go to the new state after the next
404          * iteration. */
405         break;
406     }
407   }
408   GST_OBJECT_UNLOCK (task);
409
410   return res;
411
412   /* ERRORS */
413 no_lock:
414   {
415     GST_WARNING_OBJECT (task, "state %d set on task without a lock", state);
416     GST_OBJECT_UNLOCK (task);
417     g_warning ("task without a lock can't be set to state %d", state);
418     return FALSE;
419   }
420 }
421
422 /**
423  * gst_task_start:
424  * @task: The #GstTask to start
425  *
426  * Starts @task. The @task must have a lock associated with it using
427  * gst_task_set_lock() or this function will return %FALSE.
428  *
429  * Returns: %TRUE if the task could be started.
430  *
431  * MT safe.
432  */
433 gboolean
434 gst_task_start (GstTask * task)
435 {
436   return gst_task_set_state (task, GST_TASK_STARTED);
437 }
438
439 /**
440  * gst_task_stop:
441  * @task: The #GstTask to stop
442  *
443  * Stops @task. This method merely schedules the task to stop and
444  * will not wait for the task to have completely stopped. Use
445  * gst_task_join() to stop and wait for completion.
446  *
447  * Returns: %TRUE if the task could be stopped.
448  *
449  * MT safe.
450  */
451 gboolean
452 gst_task_stop (GstTask * task)
453 {
454   return gst_task_set_state (task, GST_TASK_STOPPED);
455 }
456
457 /**
458  * gst_task_pause:
459  * @task: The #GstTask to pause
460  *
461  * Pauses @task. This method can also be called on a task in the
462  * stopped state, in which case a thread will be started and will remain
463  * in the paused state. This function does not wait for the task to complete
464  * the paused state.
465  *
466  * Returns: %TRUE if the task could be paused.
467  *
468  * MT safe.
469  */
470 gboolean
471 gst_task_pause (GstTask * task)
472 {
473   return gst_task_set_state (task, GST_TASK_PAUSED);
474 }
475
476 /**
477  * gst_task_join:
478  * @task: The #GstTask to join
479  *
480  * Joins @task. After this call, it is safe to unref the task
481  * and clean up the lock set with gst_task_set_lock().
482  *
483  * The task will automatically be stopped with this call.
484  *
485  * This function cannot be called from within a task function as this
486  * would cause a deadlock. The function will detect this and print a 
487  * g_warning.
488  *
489  * Returns: %TRUE if the task could be joined.
490  *
491  * MT safe.
492  */
493 gboolean
494 gst_task_join (GstTask * task)
495 {
496   GThread *tself;
497
498   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
499
500   tself = g_thread_self ();
501
502   GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
503
504   /* we don't use a real thread join here because we are using
505    * thread pools */
506   GST_OBJECT_LOCK (task);
507   if (G_UNLIKELY (tself == task->abidata.ABI.thread))
508     goto joining_self;
509   task->state = GST_TASK_STOPPED;
510   /* signal the state change for when it was blocked in PAUSED. */
511   GST_TASK_SIGNAL (task);
512   /* we set the running flag when pushing the task on the thread pool.
513    * This means that the task function might not be called when we try
514    * to join it here. */
515   while (G_LIKELY (task->running))
516     GST_TASK_WAIT (task);
517   GST_OBJECT_UNLOCK (task);
518
519   GST_DEBUG_OBJECT (task, "Joined task %p", task);
520
521   return TRUE;
522
523   /* ERRORS */
524 joining_self:
525   {
526     GST_WARNING_OBJECT (task, "trying to join task from its thread");
527     GST_OBJECT_UNLOCK (task);
528     g_warning ("\nTrying to join task %p from its thread would deadlock.\n"
529         "You cannot change the state of an element from its streaming\n"
530         "thread. Use g_idle_add() or post a GstMessage on the bus to\n"
531         "schedule the state change from the main thread.\n", task);
532     return FALSE;
533   }
534 }