From 1e40e471d9b2296470afd259656431af1940a34d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 24 Aug 2005 20:49:53 +0000 Subject: [PATCH] check/generic/states.c: Make sure all tasks are stopped. Original commit message from CVS: * check/generic/states.c: (GST_START_TEST): Make sure all tasks are stopped. * check/gst/gstbin.c: (GST_START_TEST): Unref after usage for proper valgrinding. * gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task): Really wait for the task to stop before destroying the mutex. * gst/gstqueue.c: (gst_queue_sink_activate_push), (gst_queue_src_activate_push): Small cleanups. Don't stop the task when we did not start it. * gst/gsttask.c: (gst_task_get_type), (gst_task_init), (gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock), (gst_task_get_state), (gst_task_start), (gst_task_pause), (gst_task_join): * gst/gsttask.h: Protect the stream lock with the object lock. Disallow setting the stream lock when running. Add cleanup_all to wait for the threadpool to finish. Remove code to autoallocate a mutex if none was provided. Add _join() to wait for a task to stop. Protect the thread pool with a global lock. --- ChangeLog | 29 +++++++++ check/generic/states.c | 2 + check/gst/gstbin.c | 4 ++ gst/gstpad.c | 20 +++++- gst/gstqueue.c | 17 +++-- gst/gsttask.c | 150 ++++++++++++++++++++++++++++++++++++++----- gst/gsttask.h | 14 ++-- plugins/elements/gstqueue.c | 17 +++-- tests/check/generic/states.c | 2 + tests/check/gst/gstbin.c | 4 ++ 10 files changed, 219 insertions(+), 40 deletions(-) diff --git a/ChangeLog b/ChangeLog index e483ed8..98eed42 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,34 @@ 2005-08-24 Wim Taymans + * check/generic/states.c: (GST_START_TEST): + Make sure all tasks are stopped. + + * check/gst/gstbin.c: (GST_START_TEST): + Unref after usage for proper valgrinding. + + * gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task): + Really wait for the task to stop before destroying the + mutex. + + * gst/gstqueue.c: (gst_queue_sink_activate_push), + (gst_queue_src_activate_push): + Small cleanups. Don't stop the task when we did not start + it. + + * gst/gsttask.c: (gst_task_get_type), (gst_task_init), + (gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock), + (gst_task_get_state), (gst_task_start), (gst_task_pause), + (gst_task_join): + * gst/gsttask.h: + Protect the stream lock with the object lock. + Disallow setting the stream lock when running. + Add cleanup_all to wait for the threadpool to finish. + Remove code to autoallocate a mutex if none was provided. + Add _join() to wait for a task to stop. + Protect the thread pool with a global lock. + +2005-08-24 Wim Taymans + * gst/base/gstbasesink.c: (gst_base_sink_handle_object), (gst_base_sink_get_times), (gst_base_sink_do_sync), (gst_base_sink_handle_buffer), (gst_base_sink_change_state): diff --git a/check/generic/states.c b/check/generic/states.c index 3b9b961..a441686 100644 --- a/check/generic/states.c +++ b/check/generic/states.c @@ -50,6 +50,8 @@ GST_START_TEST (test_state_changes) gst_element_set_state (element, GST_STATE_PAUSED); gst_element_set_state (element, GST_STATE_NULL); gst_object_unref (GST_OBJECT (element)); + + gst_task_cleanup_all (); } } diff --git a/check/gst/gstbin.c b/check/gst/gstbin.c index f39f5c0..fffada3 100644 --- a/check/gst/gstbin.c +++ b/check/gst/gstbin.c @@ -423,6 +423,10 @@ GST_START_TEST (test_add_linked) /* check if pads really are linked */ fail_unless (gst_pad_is_linked (srcpad)); fail_unless (gst_pad_is_linked (sinkpad)); + + gst_object_unref (srcpad); + gst_object_unref (sinkpad); + gst_object_unref (pipeline); } GST_END_TEST; diff --git a/gst/gstpad.c b/gst/gstpad.c index d61418b..21ce573 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -295,6 +295,14 @@ static void gst_pad_finalize (GObject * object) { GstPad *pad = GST_PAD (object); + GstTask *task; + + /* in case the task is still around, clean it up */ + if ((task = GST_PAD_TASK (pad))) { + gst_task_join (task); + GST_PAD_TASK (pad) = NULL; + gst_object_unref (task); + } if (pad->stream_rec_lock) { g_static_rec_mutex_free (pad->stream_rec_lock); @@ -1878,8 +1886,8 @@ gst_pad_default_fixate (GQuark field_id, const GValue * value, gpointer data) * @pad: a #GstPad to fixate * @caps: the #GstCaps to fixate * - * Fixate a caps on the given pad. Modifies the caps in place, so you should be - * that the caps are actually writable (see gst_caps_make_writable()). + * Fixate a caps on the given pad. Modifies the caps in place, so you should + * make sure that the caps are actually writable (see gst_caps_make_writable()). */ void gst_pad_fixate_caps (GstPad * pad, GstCaps * caps) @@ -3759,7 +3767,11 @@ no_task: * @pad: the #GstPad to stop the task of * * Stop the task of @pad. This function will also make sure that the - * function executed by the task will effectively stop. + * function executed by the task will effectively stop if not called + * from the GstTaskFunction. + * + * This function will deadlock if called from the GstTaskFunction of + * the task. Use #gst_task_pause() instead. * * Returns: a TRUE if the task could be stopped or FALSE when the pad * has no task. @@ -3782,6 +3794,8 @@ gst_pad_stop_task (GstPad * pad) GST_STREAM_LOCK (pad); GST_STREAM_UNLOCK (pad); + gst_task_join (task); + gst_object_unref (task); return TRUE; diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 6286794..fe42421 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -927,25 +927,27 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active) { - gboolean result = FALSE; + gboolean result = TRUE; GstQueue *queue; queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; - result = TRUE; + GST_QUEUE_MUTEX_UNLOCK (queue); } else { - /* step 1, unblock chain and loop functions */ + /* step 1, unblock chain function */ GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); - g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK (queue); - /* step 2, make sure streaming finishes */ - result = gst_pad_stop_task (pad); + /* and make sure the chain function finishes */ + GST_STREAM_LOCK (pad); + GST_STREAM_UNLOCK (pad); } + gst_object_unref (queue); return result; @@ -971,9 +973,10 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) } GST_QUEUE_MUTEX_UNLOCK (queue); } else { - /* step 1, unblock chain and loop functions */ + /* step 1, unblock loop function */ GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; + /* the item add signal will unblock */ g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK (queue); diff --git a/gst/gsttask.c b/gst/gsttask.c index fec1f8e..23de33f 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -25,6 +25,9 @@ #include "gstinfo.h" #include "gsttask.h" +GST_DEBUG_CATEGORY (task_debug); +#define GST_CAT_DEFAULT (task_debug) + static void gst_task_class_init (GstTaskClass * klass); static void gst_task_init (GstTask * task); static void gst_task_finalize (GObject * object); @@ -33,6 +36,8 @@ static void gst_task_func (GstTask * task, GstTaskClass * tclass); static GstObjectClass *parent_class = NULL; +static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; + GType gst_task_get_type (void) { @@ -54,6 +59,8 @@ gst_task_get_type (void) _gst_task_type = g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0); + + GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); } return _gst_task_type; } @@ -76,6 +83,7 @@ gst_task_class_init (GstTaskClass * klass) static void gst_task_init (GstTask * task) { + task->running = FALSE; task->lock = NULL; task->cond = g_cond_new (); task->state = GST_TASK_STOPPED; @@ -97,16 +105,28 @@ gst_task_finalize (GObject * object) static void gst_task_func (GstTask * task, GstTaskClass * tclass) { + GStaticRecMutex *lock; + GST_DEBUG ("Entering task %p, thread %p", task, g_thread_self ()); + /* we have to grab the lock to get the mutex. We also + * mark our state running so that nobody can mess with + * the mutex. */ + GST_LOCK (task); + if (task->state == GST_TASK_STOPPED) + goto exit; + lock = GST_TASK_GET_LOCK (task); + task->running = TRUE; + GST_UNLOCK (task); + /* locking order is TASK_LOCK, LOCK */ - GST_TASK_LOCK (task); + g_static_rec_mutex_lock (lock); GST_LOCK (task); while (G_LIKELY (task->state != GST_TASK_STOPPED)) { while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) { gint t; - t = GST_TASK_UNLOCK_FULL (task); + t = g_static_rec_mutex_unlock_full (lock); if (t <= 0) { g_warning ("wrong STREAM_LOCK count %d", t); } @@ -115,7 +135,7 @@ gst_task_func (GstTask * task, GstTaskClass * tclass) GST_UNLOCK (task); /* locking order.. */ if (t > 0) - GST_TASK_LOCK_FULL (task, t); + g_static_rec_mutex_lock_full (lock, t); GST_LOCK (task); if (task->state == GST_TASK_STOPPED) @@ -129,7 +149,14 @@ gst_task_func (GstTask * task, GstTaskClass * tclass) } done: GST_UNLOCK (task); - GST_TASK_UNLOCK (task); + g_static_rec_mutex_unlock (lock); + + /* now we allow messing with the lock again */ + GST_LOCK (task); + task->running = FALSE; +exit: + GST_TASK_SIGNAL (task); + GST_UNLOCK (task); GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ()); @@ -137,6 +164,35 @@ done: } /** + * gst_task_cleanup_all: + * + * Wait for all tasks to be stopped. This is mainly used internally + * to ensure proper cleanup of internal datastructures in testsuites. + * + * MT safe. + */ +void +gst_task_cleanup_all (void) +{ + GstTaskClass *klass; + + if ((klass = g_type_class_peek (GST_TYPE_TASK))) { + g_static_mutex_lock (&pool_lock); + if (klass->pool) { + /* Shut down all the threads, we still process the ones scheduled + * because the unref happens in the thread function. + * Also wait for currently running ones to finish. */ + g_thread_pool_free (klass->pool, FALSE, TRUE); + /* create new pool, so we can still do something after this + * call. */ + klass->pool = g_thread_pool_new ( + (GFunc) gst_task_func, klass, -1, FALSE, NULL); + } + g_static_mutex_unlock (&pool_lock); + } +} + +/** * gst_task_create: * @func: The #GstTaskFunction to use * @data: User data to pass to @func @@ -176,8 +232,19 @@ void gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex) { GST_LOCK (task); - task->lock = mutex; + if (task->running) + goto is_running; + GST_TASK_GET_LOCK (task) = mutex; GST_UNLOCK (task); + + return; + + /* ERRORS */ +is_running: + { + g_warning ("cannot call set_lock on a running task"); + GST_UNLOCK (task); + } } @@ -218,39 +285,51 @@ gst_task_get_state (GstTask * task) gboolean gst_task_start (GstTask * task) { - GstTaskClass *tclass; GstTaskState old; - GStaticRecMutex *lock; g_return_val_if_fail (GST_IS_TASK (task), FALSE); - tclass = GST_TASK_GET_CLASS (task); - GST_DEBUG_OBJECT (task, "Starting task %p", task); GST_LOCK (task); - if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) { - lock = g_new (GStaticRecMutex, 1); - g_static_rec_mutex_init (lock); - GST_TASK_GET_LOCK (task) = lock; - } + if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) + goto no_lock; old = task->state; task->state = GST_TASK_STARTED; switch (old) { case GST_TASK_STOPPED: + { + GstTaskClass *tclass; + + tclass = GST_TASK_GET_CLASS (task); + + /* new task, push on threadpool. We ref before so + * that it remains alive while on the threadpool. */ gst_object_ref (task); + g_static_mutex_lock (&pool_lock); g_thread_pool_push (tclass->pool, task, NULL); + g_static_mutex_unlock (&pool_lock); break; + } case GST_TASK_PAUSED: + /* PAUSE to PLAY, signal */ GST_TASK_SIGNAL (task); break; case GST_TASK_STARTED: + /* was OK */ break; } GST_UNLOCK (task); return TRUE; + + /* ERRORS */ +no_lock: + { + g_warning ("starting task without a lock"); + return FALSE; + } } /** @@ -305,13 +384,10 @@ gst_task_stop (GstTask * task) gboolean gst_task_pause (GstTask * task) { - GstTaskClass *tclass; GstTaskState old; g_return_val_if_fail (GST_IS_TASK (task), FALSE); - tclass = GST_TASK_GET_CLASS (task); - GST_DEBUG_OBJECT (task, "Pausing task %p", task); GST_LOCK (task); @@ -319,9 +395,17 @@ gst_task_pause (GstTask * task) task->state = GST_TASK_PAUSED; switch (old) { case GST_TASK_STOPPED: + { + GstTaskClass *tclass; + + tclass = GST_TASK_GET_CLASS (task); + gst_object_ref (task); + g_static_mutex_lock (&pool_lock); g_thread_pool_push (tclass->pool, task, NULL); + g_static_mutex_unlock (&pool_lock); break; + } case GST_TASK_PAUSED: break; case GST_TASK_STARTED: @@ -331,3 +415,35 @@ gst_task_pause (GstTask * task) return TRUE; } + +/** + * gst_task_join: + * @task: The #GstTask to join + * + * Joins @task. After this call, it is safe to unref the task + * and clean up the lock set with #gst_task_set_lock(). + * + * The task will automatically be stopped with this call. + * + * This function cannot be called from within a task function. + * + * Returns: TRUE if the task could be joined. + * + * MT safe. + */ +gboolean +gst_task_join (GstTask * task) +{ + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + GST_DEBUG_OBJECT (task, "Joining task %p", task); + + GST_LOCK (task); + task->state = GST_TASK_STOPPED; + GST_TASK_SIGNAL (task); + while (task->running) + GST_TASK_WAIT (task); + GST_UNLOCK (task); + + return TRUE; +} diff --git a/gst/gsttask.h b/gst/gsttask.h index c1210c3..33c26b0 100644 --- a/gst/gsttask.h +++ b/gst/gsttask.h @@ -55,10 +55,6 @@ typedef enum { #define GST_TASK_BROADCAST(task) g_cond_breadcast(GST_TASK_GET_COND (task)) #define GST_TASK_GET_LOCK(task) (GST_TASK_CAST(task)->lock) -#define GST_TASK_LOCK(task) g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task)) -#define GST_TASK_UNLOCK(task) g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task)) -#define GST_TASK_UNLOCK_FULL(task) g_static_rec_mutex_unlock_full(GST_TASK_GET_LOCK(task)) -#define GST_TASK_LOCK_FULL(task,t) g_static_rec_mutex_lock_full(GST_TASK_GET_LOCK(task),(t)) struct _GstTask { GstObject object; @@ -69,8 +65,10 @@ struct _GstTask { GStaticRecMutex *lock; - GstTaskFunction func; - gpointer data; + GstTaskFunction func; + gpointer data; + + gboolean running; /*< private >*/ gpointer _gst_reserved[GST_PADDING]; @@ -86,6 +84,8 @@ struct _GstTaskClass { gpointer _gst_reserved[GST_PADDING]; }; +void gst_task_cleanup_all (void); + GType gst_task_get_type (void); GstTask* gst_task_create (GstTaskFunction func, gpointer data); @@ -97,6 +97,8 @@ gboolean gst_task_start (GstTask *task); gboolean gst_task_stop (GstTask *task); gboolean gst_task_pause (GstTask *task); +gboolean gst_task_join (GstTask *task); + G_END_DECLS #endif /* __GST_TASK_H__ */ diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 6286794..fe42421 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -927,25 +927,27 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query) static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active) { - gboolean result = FALSE; + gboolean result = TRUE; GstQueue *queue; queue = GST_QUEUE (gst_pad_get_parent (pad)); if (active) { + GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; - result = TRUE; + GST_QUEUE_MUTEX_UNLOCK (queue); } else { - /* step 1, unblock chain and loop functions */ + /* step 1, unblock chain function */ GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; gst_queue_locked_flush (queue); - g_cond_signal (queue->item_del); GST_QUEUE_MUTEX_UNLOCK (queue); - /* step 2, make sure streaming finishes */ - result = gst_pad_stop_task (pad); + /* and make sure the chain function finishes */ + GST_STREAM_LOCK (pad); + GST_STREAM_UNLOCK (pad); } + gst_object_unref (queue); return result; @@ -971,9 +973,10 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active) } GST_QUEUE_MUTEX_UNLOCK (queue); } else { - /* step 1, unblock chain and loop functions */ + /* step 1, unblock loop function */ GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_WRONG_STATE; + /* the item add signal will unblock */ g_cond_signal (queue->item_add); GST_QUEUE_MUTEX_UNLOCK (queue); diff --git a/tests/check/generic/states.c b/tests/check/generic/states.c index 3b9b961..a441686 100644 --- a/tests/check/generic/states.c +++ b/tests/check/generic/states.c @@ -50,6 +50,8 @@ GST_START_TEST (test_state_changes) gst_element_set_state (element, GST_STATE_PAUSED); gst_element_set_state (element, GST_STATE_NULL); gst_object_unref (GST_OBJECT (element)); + + gst_task_cleanup_all (); } } diff --git a/tests/check/gst/gstbin.c b/tests/check/gst/gstbin.c index f39f5c0..fffada3 100644 --- a/tests/check/gst/gstbin.c +++ b/tests/check/gst/gstbin.c @@ -423,6 +423,10 @@ GST_START_TEST (test_add_linked) /* check if pads really are linked */ fail_unless (gst_pad_is_linked (srcpad)); fail_unless (gst_pad_is_linked (sinkpad)); + + gst_object_unref (srcpad); + gst_object_unref (sinkpad); + gst_object_unref (pipeline); } GST_END_TEST; -- 2.7.4