X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fgsttask.c;h=30f25fef8f7799f822af55e783144411f5822010;hb=ce43de86902c4e9c8ed4e9682602664cb9bce2ee;hp=b904fc98308cd73912bfe324237b99520eac30ba;hpb=588dcec8ae5c8d867ce6efa1d8ced1750a865ecc;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/gsttask.c b/gst/gsttask.c index b904fc9..30f25fe 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -16,12 +16,13 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** * SECTION:gsttask + * @title: GstTask * @short_description: Abstraction of GStreamer streaming threads. * @see_also: #GstElement, #GstPad * @@ -38,7 +39,7 @@ * might sometimes be needed to create a #GstTask manually if it is not related to * a #GstPad. * - * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with + * Before the #GstTask can be run, it needs a #GRecMutex that can be set with * gst_task_set_lock(). * * The task can be started, paused and stopped with gst_task_start(), gst_task_pause() @@ -54,24 +55,23 @@ * stopped and the thread is stopped. * * After creating a #GstTask, use gst_object_unref() to free its resources. This can - * only be done it the task is not running anymore. + * only be done when the task is not running anymore. * * Task functions can send a #GstMessage to send out-of-band data to the * application. The application can receive messages from the #GstBus in its * mainloop. * - * For debugging perposes, the task will configure its object name as the thread + * For debugging purposes, the task will configure its object name as the thread * name on Linux. Please note that the object name should be configured before the * task is started; changing the object name after the task has been started, has * no effect on the thread name. - * - * Last reviewed on 2010-03-15 (0.10.29) */ #include "gst_private.h" #include "gstinfo.h" #include "gsttask.h" +#include "glib-compat-private.h" #include @@ -79,24 +79,26 @@ #include #endif +#ifdef HAVE_PTHREAD_SETNAME_NP_WITHOUT_TID +#include +#endif + GST_DEBUG_CATEGORY_STATIC (task_debug); #define GST_CAT_DEFAULT (task_debug) #define SET_TASK_STATE(t,s) (g_atomic_int_set (&GST_TASK_STATE(t), (s))) #define GET_TASK_STATE(t) ((GstTaskState) g_atomic_int_get (&GST_TASK_STATE(t))) -#define GST_TASK_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate)) - struct _GstTaskPrivate { /* callbacks for managing the thread of this task */ - GstTaskThreadCallbacks thr_callbacks; - gpointer thr_user_data; - GDestroyNotify thr_notify; + GstTaskThreadFunc enter_func; + gpointer enter_user_data; + GDestroyNotify enter_notify; - gboolean prio_set; - GThreadPriority priority; + GstTaskThreadFunc leave_func; + gpointer leave_user_data; + GDestroyNotify leave_notify; /* configured pool */ GstTaskPool *pool; @@ -107,6 +109,7 @@ struct _GstTaskPrivate }; #ifdef _MSC_VER +#define WIN32_LEAN_AND_MEAN #include struct _THREADNAME_INFO @@ -129,7 +132,7 @@ SetThreadName (DWORD dwThreadID, LPCSTR szThreadName) __try { RaiseException (0x406D1388, 0, sizeof (info) / sizeof (DWORD), - (DWORD *) & info); + (const ULONG_PTR *) &info); } __except (EXCEPTION_CONTINUE_EXECUTION) { } @@ -140,26 +143,29 @@ static void gst_task_finalize (GObject * object); static void gst_task_func (GstTask * task); -static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; +static GMutex pool_lock; #define _do_init \ { \ GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \ } -G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init); +G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, + G_ADD_PRIVATE (GstTask) _do_init); static void init_klass_pool (GstTaskClass * klass) { - g_static_mutex_lock (&pool_lock); + g_mutex_lock (&pool_lock); if (klass->pool) { gst_task_pool_cleanup (klass->pool); gst_object_unref (klass->pool); } klass->pool = gst_task_pool_new (); + /* Classes are never destroyed so this ref will never be dropped */ + GST_OBJECT_FLAG_SET (klass->pool, GST_OBJECT_FLAG_MAY_BE_LEAKED); gst_task_pool_prepare (klass->pool, NULL); - g_static_mutex_unlock (&pool_lock); + g_mutex_unlock (&pool_lock); } static void @@ -169,8 +175,6 @@ gst_task_class_init (GstTaskClass * klass) gobject_class = (GObjectClass *) klass; - g_type_class_add_private (klass, sizeof (GstTaskPrivate)); - gobject_class->finalize = gst_task_finalize; init_klass_pool (klass); @@ -183,19 +187,18 @@ gst_task_init (GstTask * task) klass = GST_TASK_GET_CLASS (task); - task->priv = GST_TASK_GET_PRIVATE (task); + task->priv = gst_task_get_instance_private (task); task->running = FALSE; task->thread = NULL; task->lock = NULL; - task->cond = g_cond_new (); + g_cond_init (&task->cond); SET_TASK_STATE (task, GST_TASK_STOPPED); - task->priv->prio_set = FALSE; /* use the default klass pool for this task, users can * override this later */ - g_static_mutex_lock (&pool_lock); + g_mutex_lock (&pool_lock); task->priv->pool = gst_object_ref (klass->pool); - g_static_mutex_unlock (&pool_lock); + g_mutex_unlock (&pool_lock); } static void @@ -206,17 +209,20 @@ gst_task_finalize (GObject * object) GST_DEBUG ("task %p finalize", task); - if (priv->thr_notify) - priv->thr_notify (priv->thr_user_data); - priv->thr_notify = NULL; - priv->thr_user_data = NULL; + if (priv->enter_notify) + priv->enter_notify (priv->enter_user_data); + + if (priv->leave_notify) + priv->leave_notify (priv->leave_user_data); + + if (task->notify) + task->notify (task->user_data); gst_object_unref (priv->pool); /* task thread cannot be running here since it holds a ref * to the task so that the finalize could not have happened */ - g_cond_free (task->cond); - task->cond = NULL; + g_cond_clear (&task->cond); G_OBJECT_CLASS (gst_task_parent_class)->finalize (object); } @@ -241,8 +247,19 @@ gst_task_configure_name (GstTask * task) GST_DEBUG_OBJECT (task, "Failed to set thread name"); } GST_OBJECT_UNLOCK (task); -#endif -#ifdef _MSC_VER +#elif defined(HAVE_PTHREAD_SETNAME_NP_WITHOUT_TID) + const gchar *name; + + GST_OBJECT_LOCK (task); + name = GST_OBJECT_NAME (task); + + /* set the thread name to something easily identifiable */ + GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", name); + if (pthread_setname_np (name)) + GST_DEBUG_OBJECT (task, "Failed to set thread name"); + + GST_OBJECT_UNLOCK (task); +#elif defined (_MSC_VER) const gchar *name; name = GST_OBJECT_NAME (task); @@ -255,7 +272,7 @@ gst_task_configure_name (GstTask * task) static void gst_task_func (GstTask * task) { - GStaticRecMutex *lock; + GRecMutex *lock; GThread *tself; GstTaskPrivate *priv; @@ -275,63 +292,57 @@ gst_task_func (GstTask * task) if (G_UNLIKELY (lock == NULL)) goto no_lock; task->thread = tself; - /* only update the priority when it was changed */ - if (priv->prio_set) - g_thread_set_priority (tself, priv->priority); GST_OBJECT_UNLOCK (task); - /* fire the enter_thread callback when we need to */ - if (priv->thr_callbacks.enter_thread) - priv->thr_callbacks.enter_thread (task, tself, priv->thr_user_data); + /* fire the enter_func callback when we need to */ + if (priv->enter_func) + priv->enter_func (task, tself, priv->enter_user_data); /* locking order is TASK_LOCK, LOCK */ - g_static_rec_mutex_lock (lock); + g_rec_mutex_lock (lock); /* configure the thread name now */ gst_task_configure_name (task); while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) { - if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_PAUSED)) { + GST_OBJECT_LOCK (task); + while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) { + g_rec_mutex_unlock (lock); + + GST_TASK_SIGNAL (task); + GST_INFO_OBJECT (task, "Task going to paused"); + GST_TASK_WAIT (task); + GST_INFO_OBJECT (task, "Task resume from paused"); + GST_OBJECT_UNLOCK (task); + /* locking order.. */ + g_rec_mutex_lock (lock); GST_OBJECT_LOCK (task); - while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) { - g_static_rec_mutex_unlock (lock); + } - GST_TASK_SIGNAL (task); - GST_TASK_WAIT (task); - GST_OBJECT_UNLOCK (task); - /* locking order.. */ - g_static_rec_mutex_lock (lock); - - GST_OBJECT_LOCK (task); - if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) { - GST_OBJECT_UNLOCK (task); - goto done; - } - } + if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) { + GST_OBJECT_UNLOCK (task); + break; + } else { GST_OBJECT_UNLOCK (task); } - task->func (task->data); + task->func (task->user_data); } -done: - g_static_rec_mutex_unlock (lock); + + g_rec_mutex_unlock (lock); GST_OBJECT_LOCK (task); task->thread = NULL; exit: - if (priv->thr_callbacks.leave_thread) { - /* fire the leave_thread callback when we need to. We need to do this before + if (priv->leave_func) { + /* fire the leave_func callback when we need to. We need to do this before * we signal the task and with the task lock released. */ GST_OBJECT_UNLOCK (task); - priv->thr_callbacks.leave_thread (task, tself, priv->thr_user_data); + priv->leave_func (task, tself, priv->leave_user_data); GST_OBJECT_LOCK (task); - } else { - /* restore normal priority when releasing back into the pool, we will not - * touch the priority when a custom callback has been installed. */ - g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL); } /* now we allow messing with the lock again by setting the running flag to - * FALSE. Together with the SIGNAL this is the sign for the _join() to + * %FALSE. Together with the SIGNAL this is the sign for the _join() to * complete. * Note that we still have not dropped the final ref on the task. We could * check here if there is a pending join() going on and drop the last ref @@ -369,15 +380,19 @@ gst_task_cleanup_all (void) if ((klass = g_type_class_peek (GST_TYPE_TASK))) { init_klass_pool (klass); } + + /* GstElement owns a GThreadPool */ + _priv_gst_element_cleanup (); } /** * gst_task_new: * @func: The #GstTaskFunction to use - * @data: (closure): User data to pass to @func + * @user_data: User data to pass to @func + * @notify: the function to call when @user_data is no longer needed. * * Create a new Task that will repeatedly call the provided @func - * with @data as a parameter. Typically the task will run in + * with @user_data as a parameter. Typically the task will run in * a new thread. * * The function cannot be changed after the task has been created. You @@ -386,7 +401,7 @@ gst_task_cleanup_all (void) * This function will not yet create and start a thread. Use gst_task_start() or * gst_task_pause() to create and start the GThread. * - * Before the task can be used, a #GStaticRecMutex must be configured using the + * Before the task can be used, a #GRecMutex must be configured using the * gst_task_set_lock() function. This lock will always be acquired while * @func is called. * @@ -395,23 +410,29 @@ gst_task_cleanup_all (void) * MT safe. */ GstTask * -gst_task_new (GstTaskFunction func, gpointer data) +gst_task_new (GstTaskFunction func, gpointer user_data, GDestroyNotify notify) { GstTask *task; - task = g_object_newv (GST_TYPE_TASK, 0, NULL); + g_return_val_if_fail (func != NULL, NULL); + + task = g_object_new (GST_TYPE_TASK, NULL); task->func = func; - task->data = data; + task->user_data = user_data; + task->notify = notify; GST_DEBUG ("Created task %p", task); + /* clear floating flag */ + gst_object_ref_sink (task); + return task; } /** * gst_task_set_lock: * @task: The #GstTask to use - * @mutex: The #GMutex to use + * @mutex: The #GRecMutex to use * * Set the mutex used by the task. The mutex will be acquired before * calling the #GstTaskFunction. @@ -422,11 +443,14 @@ gst_task_new (GstTaskFunction func, gpointer data) * MT safe. */ void -gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex) +gst_task_set_lock (GstTask * task, GRecMutex * mutex) { + g_return_if_fail (GST_IS_TASK (task)); + GST_OBJECT_LOCK (task); if (G_UNLIKELY (task->running)) goto is_running; + GST_INFO ("setting stream lock %p on task %p", mutex, task); GST_TASK_GET_LOCK (task) = mutex; GST_OBJECT_UNLOCK (task); @@ -441,41 +465,6 @@ is_running: } /** - * gst_task_set_priority: - * @task: a #GstTask - * @priority: a new priority for @task - * - * Changes the priority of @task to @priority. - * - * Note: try not to depend on task priorities. - * - * MT safe. - * - * Since: 0.10.24 - */ -void -gst_task_set_priority (GstTask * task, GThreadPriority priority) -{ - GstTaskPrivate *priv; - GThread *thread; - - g_return_if_fail (GST_IS_TASK (task)); - - priv = task->priv; - - GST_OBJECT_LOCK (task); - priv->prio_set = TRUE; - priv->priority = priority; - thread = task->thread; - if (thread != NULL) { - /* if this task already has a thread, we can configure the priority right - * away, else we do that when we assign a thread to the task. */ - g_thread_set_priority (thread, priority); - } - GST_OBJECT_UNLOCK (task); -} - -/** * gst_task_get_pool: * @task: a #GstTask * @@ -486,8 +475,6 @@ gst_task_set_priority (GstTask * task, GThreadPriority priority) * * Returns: (transfer full): the #GstTaskPool used by @task. gst_object_unref() * after usage. - * - * Since: 0.10.24 */ GstTaskPool * gst_task_get_pool (GstTask * task) @@ -515,8 +502,6 @@ gst_task_get_pool (GstTask * task) * will be created by @task will now use @pool. * * MT safe. - * - * Since: 0.10.24 */ void gst_task_set_pool (GstTask * task, GstTaskPool * pool) @@ -541,56 +526,79 @@ gst_task_set_pool (GstTask * task, GstTaskPool * pool) gst_object_unref (old); } - /** - * gst_task_set_thread_callbacks: + * gst_task_set_enter_callback: * @task: The #GstTask to use - * @callbacks: (in): a #GstTaskThreadCallbacks pointer - * @user_data: (closure): user data passed to the callbacks + * @enter_func: (in): a #GstTaskThreadFunc + * @user_data: user data passed to @enter_func * @notify: called when @user_data is no longer referenced * - * Set callbacks which will be executed when a new thread is needed, the thread - * function is entered and left and when the thread is joined. - * - * By default a thread for @task will be created from a default thread pool. - * - * Objects can use custom GThreads or can perform additional configuration of - * the threads (such as changing the thread priority) by installing callbacks. - * - * MT safe. - * - * Since: 0.10.24 + * Call @enter_func when the task function of @task is entered. @user_data will + * be passed to @enter_func and @notify will be called when @user_data is no + * longer referenced. */ void -gst_task_set_thread_callbacks (GstTask * task, - GstTaskThreadCallbacks * callbacks, gpointer user_data, - GDestroyNotify notify) +gst_task_set_enter_callback (GstTask * task, GstTaskThreadFunc enter_func, + gpointer user_data, GDestroyNotify notify) { GDestroyNotify old_notify; g_return_if_fail (task != NULL); g_return_if_fail (GST_IS_TASK (task)); - g_return_if_fail (callbacks != NULL); GST_OBJECT_LOCK (task); - old_notify = task->priv->thr_notify; + if ((old_notify = task->priv->enter_notify)) { + gpointer old_data = task->priv->enter_user_data; - if (old_notify) { - gpointer old_data; + task->priv->enter_user_data = NULL; + task->priv->enter_notify = NULL; + GST_OBJECT_UNLOCK (task); - old_data = task->priv->thr_user_data; + old_notify (old_data); - task->priv->thr_user_data = NULL; - task->priv->thr_notify = NULL; + GST_OBJECT_LOCK (task); + } + task->priv->enter_func = enter_func; + task->priv->enter_user_data = user_data; + task->priv->enter_notify = notify; + GST_OBJECT_UNLOCK (task); +} + +/** + * gst_task_set_leave_callback: + * @task: The #GstTask to use + * @leave_func: (in): a #GstTaskThreadFunc + * @user_data: user data passed to @leave_func + * @notify: called when @user_data is no longer referenced + * + * Call @leave_func when the task function of @task is left. @user_data will + * be passed to @leave_func and @notify will be called when @user_data is no + * longer referenced. + */ +void +gst_task_set_leave_callback (GstTask * task, GstTaskThreadFunc leave_func, + gpointer user_data, GDestroyNotify notify) +{ + GDestroyNotify old_notify; + + g_return_if_fail (task != NULL); + g_return_if_fail (GST_IS_TASK (task)); + + GST_OBJECT_LOCK (task); + if ((old_notify = task->priv->leave_notify)) { + gpointer old_data = task->priv->leave_user_data; + + task->priv->leave_user_data = NULL; + task->priv->leave_notify = NULL; GST_OBJECT_UNLOCK (task); old_notify (old_data); GST_OBJECT_LOCK (task); } - task->priv->thr_callbacks = *callbacks; - task->priv->thr_user_data = user_data; - task->priv->thr_notify = notify; + task->priv->leave_func = leave_func; + task->priv->leave_user_data = user_data; + task->priv->leave_notify = notify; GST_OBJECT_UNLOCK (task); } @@ -664,8 +672,6 @@ start_task (GstTask * task) * MT safe. * * Returns: %TRUE if the state could be changed. - * - * Since: 0.10.24 */ gboolean gst_task_set_state (GstTask * task, GstTaskState state) @@ -796,10 +802,10 @@ gst_task_join (GstTask * task) gpointer id; GstTaskPool *pool = NULL; - priv = task->priv; - g_return_val_if_fail (GST_IS_TASK (task), FALSE); + priv = task->priv; + tself = g_thread_self (); GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);