2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
5 * gsttask.c: Streaming tasks
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 02111-1307, USA.
25 * @short_description: Abstraction of GStreamer streaming threads.
26 * @see_also: #GstElement, #GstPad
28 * #GstTask is used by #GstElement and #GstPad to provide the data passing
29 * threads in a #GstPipeline.
31 * A #GstPad will typically start a #GstTask to push or pull data to/from the
32 * peer pads. Most source elements start a #GstTask to push data. In some cases
33 * a demuxer element can start a #GstTask to pull data from a peer element. This
34 * is typically done when the demuxer can perform random access on the upstream
35 * peer element for improved performance.
37 * Although convenience functions exist on #GstPad to start/pause/stop tasks, it
38 * might sometimes be needed to create a #GstTask manually if it is not related to
41 * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with
42 * gst_task_set_lock().
44 * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
45 * and gst_task_stop() respectively.
47 * A #GstTask will repeatedly call the #GstTaskFunction with the user data
48 * that was provided when creating the task with gst_task_create(). Before calling
49 * the function it will acquire the provided lock.
51 * Stopping a task with gst_task_stop() will not immediately make sure the task is
52 * not running anymore. Use gst_task_join() to make sure the task is completely
53 * stopped and the thread is stopped.
55 * After creating a #GstTask, use gst_object_unref() to free its resources. This can
56 * only be done it the task is not running anymore.
58 * Last reviewed on 2006-02-13 (0.10.4)
61 #include "gst_private.h"
66 GST_DEBUG_CATEGORY_STATIC (task_debug);
67 #define GST_CAT_DEFAULT (task_debug)
69 static void gst_task_class_init (GstTaskClass * klass);
70 static void gst_task_init (GstTask * task);
71 static void gst_task_finalize (GObject * object);
73 static void gst_task_func (GstTask * task, GstTaskClass * tclass);
75 static GstObjectClass *parent_class = NULL;
77 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
81 GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \
84 G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init);
87 gst_task_class_init (GstTaskClass * klass)
89 GObjectClass *gobject_class;
91 gobject_class = (GObjectClass *) klass;
93 parent_class = g_type_class_peek_parent (klass);
95 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
97 klass->pool = g_thread_pool_new (
98 (GFunc) gst_task_func, klass, -1, FALSE, NULL);
102 gst_task_init (GstTask * task)
104 task->running = FALSE;
105 task->abidata.ABI.thread = NULL;
107 task->cond = g_cond_new ();
108 task->state = GST_TASK_STOPPED;
112 gst_task_finalize (GObject * object)
114 GstTask *task = GST_TASK (object);
116 GST_DEBUG ("task %p finalize", task);
118 /* task thread cannot be running here since it holds a ref
119 * to the task so that the finalize could not have happened */
120 g_cond_free (task->cond);
123 G_OBJECT_CLASS (parent_class)->finalize (object);
127 gst_task_func (GstTask * task, GstTaskClass * tclass)
129 GStaticRecMutex *lock;
132 tself = g_thread_self ();
134 GST_DEBUG ("Entering task %p, thread %p", task, tself);
136 /* we have to grab the lock to get the mutex. We also
137 * mark our state running so that nobody can mess with
139 GST_OBJECT_LOCK (task);
140 if (task->state == GST_TASK_STOPPED)
142 lock = GST_TASK_GET_LOCK (task);
143 if (G_UNLIKELY (lock == NULL))
145 task->abidata.ABI.thread = tself;
146 GST_OBJECT_UNLOCK (task);
148 /* locking order is TASK_LOCK, LOCK */
149 g_static_rec_mutex_lock (lock);
150 GST_OBJECT_LOCK (task);
151 while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
152 while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
155 t = g_static_rec_mutex_unlock_full (lock);
157 g_warning ("wrong STREAM_LOCK count %d", t);
159 GST_TASK_SIGNAL (task);
160 GST_TASK_WAIT (task);
161 GST_OBJECT_UNLOCK (task);
162 /* locking order.. */
164 g_static_rec_mutex_lock_full (lock, t);
166 GST_OBJECT_LOCK (task);
167 if (G_UNLIKELY (task->state == GST_TASK_STOPPED))
170 GST_OBJECT_UNLOCK (task);
172 task->func (task->data);
174 GST_OBJECT_LOCK (task);
177 GST_OBJECT_UNLOCK (task);
178 g_static_rec_mutex_unlock (lock);
180 GST_OBJECT_LOCK (task);
181 task->abidata.ABI.thread = NULL;
184 /* now we allow messing with the lock again by setting the running flag to
185 * FALSE. Together with the SIGNAL this is the sign for the _join() to
187 * Note that we still have not dropped the final ref on the task. We could
188 * check here if there is a pending join() going on and drop the last ref
189 * before releasing the lock as we can be sure that a ref is held by the
190 * caller of the join(). */
191 task->running = FALSE;
192 GST_TASK_SIGNAL (task);
193 GST_OBJECT_UNLOCK (task);
195 GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
197 gst_object_unref (task);
202 g_warning ("starting task without a lock");
208 * gst_task_cleanup_all:
210 * Wait for all tasks to be stopped. This is mainly used internally
211 * to ensure proper cleanup of internal data structures in test suites.
216 gst_task_cleanup_all (void)
220 if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
221 g_static_mutex_lock (&pool_lock);
223 /* Shut down all the threads, we still process the ones scheduled
224 * because the unref happens in the thread function.
225 * Also wait for currently running ones to finish. */
226 g_thread_pool_free (klass->pool, FALSE, TRUE);
227 /* create new pool, so we can still do something after this
229 klass->pool = g_thread_pool_new (
230 (GFunc) gst_task_func, klass, -1, FALSE, NULL);
232 g_static_mutex_unlock (&pool_lock);
238 * @func: The #GstTaskFunction to use
239 * @data: User data to pass to @func
241 * Create a new Task that will repeatedly call the provided @func
242 * with @data as a parameter. Typically the task will run in
245 * The function cannot be changed after the task has been created. You
246 * must create a new #GstTask to change the function.
248 * Returns: A new #GstTask.
253 gst_task_create (GstTaskFunction func, gpointer data)
257 task = g_object_new (GST_TYPE_TASK, NULL);
261 GST_DEBUG ("Created task %p", task);
268 * @task: The #GstTask to use
269 * @mutex: The #GMutex to use
271 * Set the mutex used by the task. The mutex will be acquired before
272 * calling the #GstTaskFunction.
274 * This function has to be called before calling gst_task_pause() or
280 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
282 GST_OBJECT_LOCK (task);
283 if (G_UNLIKELY (task->running))
285 GST_TASK_GET_LOCK (task) = mutex;
286 GST_OBJECT_UNLOCK (task);
293 GST_OBJECT_UNLOCK (task);
294 g_warning ("cannot call set_lock on a running task");
300 * gst_task_get_state:
301 * @task: The #GstTask to query
303 * Get the current state of the task.
305 * Returns: The #GstTaskState of the task
310 gst_task_get_state (GstTask * task)
314 g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
316 GST_OBJECT_LOCK (task);
317 result = task->state;
318 GST_OBJECT_UNLOCK (task);
323 /* make sure the task is running and start a thread if it's not.
324 * This function must be called with the task LOCK. */
326 start_task (GstTask * task)
329 GstTaskClass *tclass;
330 GError *error = NULL;
332 /* new task, We ref before so that it remains alive while
333 * the thread is running. */
334 gst_object_ref (task);
335 /* mark task as running so that a join will wait until we schedule
336 * and exit the task function. */
337 task->running = TRUE;
339 tclass = GST_TASK_GET_CLASS (task);
341 /* push on the thread pool */
342 g_static_mutex_lock (&pool_lock);
343 g_thread_pool_push (tclass->pool, task, &error);
344 g_static_mutex_unlock (&pool_lock);
347 g_warning ("failed to create thread: %s", error->message);
348 g_error_free (error);
356 * gst_task_set_state:
358 * @state: the new task state
360 * Sets the state of @task to @state.
362 * The @task must have a lock associated with it using
363 * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or
364 * this function will return %FALSE.
366 * Returns: %TRUE if the state could be changed.
373 gst_task_set_state (GstTask * task, GstTaskState state)
378 g_return_val_if_fail (GST_IS_TASK (task), FALSE);
380 GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state);
382 GST_OBJECT_LOCK (task);
383 if (state != GST_TASK_STOPPED)
384 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
387 /* if the state changed, do our thing */
392 case GST_TASK_STOPPED:
393 /* If the task already has a thread scheduled we don't have to do
395 if (G_UNLIKELY (!task->running))
396 res = start_task (task);
398 case GST_TASK_PAUSED:
399 /* when we are paused, signal to go to the new state */
400 GST_TASK_SIGNAL (task);
402 case GST_TASK_STARTED:
403 /* if we were started, we'll go to the new state after the next
408 GST_OBJECT_UNLOCK (task);
415 GST_WARNING_OBJECT (task, "state %d set on task without a lock", state);
416 GST_OBJECT_UNLOCK (task);
417 g_warning ("task without a lock can't be set to state %d", state);
424 * @task: The #GstTask to start
426 * Starts @task. The @task must have a lock associated with it using
427 * gst_task_set_lock() or this function will return %FALSE.
429 * Returns: %TRUE if the task could be started.
434 gst_task_start (GstTask * task)
436 return gst_task_set_state (task, GST_TASK_STARTED);
441 * @task: The #GstTask to stop
443 * Stops @task. This method merely schedules the task to stop and
444 * will not wait for the task to have completely stopped. Use
445 * gst_task_join() to stop and wait for completion.
447 * Returns: %TRUE if the task could be stopped.
452 gst_task_stop (GstTask * task)
454 return gst_task_set_state (task, GST_TASK_STOPPED);
459 * @task: The #GstTask to pause
461 * Pauses @task. This method can also be called on a task in the
462 * stopped state, in which case a thread will be started and will remain
463 * in the paused state. This function does not wait for the task to complete
466 * Returns: %TRUE if the task could be paused.
471 gst_task_pause (GstTask * task)
473 return gst_task_set_state (task, GST_TASK_PAUSED);
478 * @task: The #GstTask to join
480 * Joins @task. After this call, it is safe to unref the task
481 * and clean up the lock set with gst_task_set_lock().
483 * The task will automatically be stopped with this call.
485 * This function cannot be called from within a task function as this
486 * would cause a deadlock. The function will detect this and print a
489 * Returns: %TRUE if the task could be joined.
494 gst_task_join (GstTask * task)
498 g_return_val_if_fail (GST_IS_TASK (task), FALSE);
500 tself = g_thread_self ();
502 GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
504 /* we don't use a real thread join here because we are using
506 GST_OBJECT_LOCK (task);
507 if (G_UNLIKELY (tself == task->abidata.ABI.thread))
509 task->state = GST_TASK_STOPPED;
510 /* signal the state change for when it was blocked in PAUSED. */
511 GST_TASK_SIGNAL (task);
512 /* we set the running flag when pushing the task on the thread pool.
513 * This means that the task function might not be called when we try
514 * to join it here. */
515 while (G_LIKELY (task->running))
516 GST_TASK_WAIT (task);
517 GST_OBJECT_UNLOCK (task);
519 GST_DEBUG_OBJECT (task, "Joined task %p", task);
526 GST_WARNING_OBJECT (task, "trying to join task from its thread");
527 GST_OBJECT_UNLOCK (task);
528 g_warning ("\nTrying to join task %p from its thread would deadlock.\n"
529 "You cannot change the state of an element from its streaming\n"
530 "thread. Use g_idle_add() or post a GstMessage on the bus to\n"
531 "schedule the state change from the main thread.\n", task);