* is typically done when the demuxer can perform random access on the upstream
* peer element for improved performance.
*
- * Although convenience functions exist on #GstPad to start/pause/stop tasks, it
+ * Although convenience functions exist on #GstPad to start/pause/stop tasks, it
* might sometimes be needed to create a #GstTask manually if it is not related to
* a #GstPad.
*
* and gst_task_stop() respectively or with the gst_task_set_state() function.
*
* A #GstTask will repeatedly call the #GstTaskFunction with the user data
- * that was provided when creating the task with gst_task_create(). While calling
+ * that was provided when creating the task with gst_task_new(). While calling
* the function it will acquire the provided lock. The provided lock is released
* when the task pauses or stops.
*
* After creating a #GstTask, use gst_object_unref() to free its resources. This can
* only be done it the task is not running anymore.
*
- * Last reviewed on 2006-02-13 (0.10.4)
+ * Task functions can send a #GstMessage to send out-of-band data to the
+ * application. The application can receive messages from the #GstBus in its
+ * mainloop.
+ *
+ * For debugging perposes, the task will configure its object name as the thread
+ * name on Linux. Please note that the object name should be configured before the
+ * task is started; changing the object name after the task has been started, has
+ * no effect on the thread name.
+ *
+ * Last reviewed on 2010-03-15 (0.10.29)
*/
#include "gst_private.h"
#include "gstinfo.h"
#include "gsttask.h"
+#include <stdio.h>
+
+#ifdef HAVE_SYS_PRCTL_H
+#include <sys/prctl.h>
+#endif
+
GST_DEBUG_CATEGORY_STATIC (task_debug);
#define GST_CAT_DEFAULT (task_debug)
+#define SET_TASK_STATE(t,s) (g_atomic_int_set (&GST_TASK_STATE(t), (s)))
+#define GET_TASK_STATE(t) ((GstTaskState) g_atomic_int_get (&GST_TASK_STATE(t)))
+
#define GST_TASK_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate))
GstTaskPool *pool_id;
};
-static void gst_task_class_init (GstTaskClass * klass);
-static void gst_task_init (GstTask * task);
+#ifdef _MSC_VER
+#include <windows.h>
+
+struct _THREADNAME_INFO
+{
+ DWORD dwType; // must be 0x1000
+ LPCSTR szName; // pointer to name (in user addr space)
+ DWORD dwThreadID; // thread ID (-1=caller thread)
+ DWORD dwFlags; // reserved for future use, must be zero
+};
+typedef struct _THREADNAME_INFO THREADNAME_INFO;
+
+void
+SetThreadName (DWORD dwThreadID, LPCSTR szThreadName)
+{
+ THREADNAME_INFO info;
+ info.dwType = 0x1000;
+ info.szName = szThreadName;
+ info.dwThreadID = dwThreadID;
+ info.dwFlags = 0;
+
+ __try {
+ RaiseException (0x406D1388, 0, sizeof (info) / sizeof (DWORD),
+ (DWORD *) & info);
+ }
+ __except (EXCEPTION_CONTINUE_EXECUTION) {
+ }
+}
+#endif
+
static void gst_task_finalize (GObject * object);
static void gst_task_func (GstTask * task);
task->priv = GST_TASK_GET_PRIVATE (task);
task->running = FALSE;
- task->abidata.ABI.thread = NULL;
+ task->thread = NULL;
task->lock = NULL;
task->cond = g_cond_new ();
- task->state = GST_TASK_STOPPED;
+ SET_TASK_STATE (task, GST_TASK_STOPPED);
task->priv->prio_set = FALSE;
/* use the default klass pool for this task, users can
G_OBJECT_CLASS (gst_task_parent_class)->finalize (object);
}
+/* should be called with the object LOCK */
+static void
+gst_task_configure_name (GstTask * task)
+{
+#if defined(HAVE_SYS_PRCTL_H) && defined(PR_SET_NAME)
+ const gchar *name;
+ gchar thread_name[17] = { 0, };
+
+ GST_OBJECT_LOCK (task);
+ name = GST_OBJECT_NAME (task);
+
+ /* set the thread name to something easily identifiable */
+ if (!snprintf (thread_name, 17, "%s", GST_STR_NULL (name))) {
+ GST_DEBUG_OBJECT (task, "Could not create thread name for '%s'", name);
+ } else {
+ GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", thread_name);
+ if (prctl (PR_SET_NAME, (unsigned long int) thread_name, 0, 0, 0))
+ GST_DEBUG_OBJECT (task, "Failed to set thread name");
+ }
+ GST_OBJECT_UNLOCK (task);
+#endif
+#ifdef _MSC_VER
+ const gchar *name;
+ name = GST_OBJECT_NAME (task);
+
+ /* set the thread name to something easily identifiable */
+ GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", name);
+ SetThreadName (-1, name);
+#endif
+}
+
static void
gst_task_func (GstTask * task)
{
* mark our state running so that nobody can mess with
* the mutex. */
GST_OBJECT_LOCK (task);
- if (task->state == GST_TASK_STOPPED)
+ if (GET_TASK_STATE (task) == GST_TASK_STOPPED)
goto exit;
lock = GST_TASK_GET_LOCK (task);
if (G_UNLIKELY (lock == NULL))
goto no_lock;
- task->abidata.ABI.thread = tself;
+ task->thread = tself;
/* only update the priority when it was changed */
if (priv->prio_set)
g_thread_set_priority (tself, priv->priority);
/* locking order is TASK_LOCK, LOCK */
g_static_rec_mutex_lock (lock);
- GST_OBJECT_LOCK (task);
- while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
- while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
- gint t;
+ /* configure the thread name now */
+ gst_task_configure_name (task);
+
+ while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
+ if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_PAUSED)) {
+ GST_OBJECT_LOCK (task);
+ while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
+ g_static_rec_mutex_unlock (lock);
- t = g_static_rec_mutex_unlock_full (lock);
- if (t <= 0) {
- g_warning ("wrong STREAM_LOCK count %d", t);
+ GST_TASK_SIGNAL (task);
+ GST_TASK_WAIT (task);
+ GST_OBJECT_UNLOCK (task);
+ /* locking order.. */
+ g_static_rec_mutex_lock (lock);
+
+ GST_OBJECT_LOCK (task);
+ if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) {
+ GST_OBJECT_UNLOCK (task);
+ goto done;
+ }
}
- GST_TASK_SIGNAL (task);
- GST_TASK_WAIT (task);
GST_OBJECT_UNLOCK (task);
- /* locking order.. */
- if (t > 0)
- g_static_rec_mutex_lock_full (lock, t);
-
- GST_OBJECT_LOCK (task);
- if (G_UNLIKELY (task->state == GST_TASK_STOPPED))
- goto done;
}
- GST_OBJECT_UNLOCK (task);
task->func (task->data);
-
- GST_OBJECT_LOCK (task);
}
done:
- GST_OBJECT_UNLOCK (task);
g_static_rec_mutex_unlock (lock);
GST_OBJECT_LOCK (task);
- task->abidata.ABI.thread = NULL;
+ task->thread = NULL;
exit:
if (priv->thr_callbacks.leave_thread) {
g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL);
}
/* now we allow messing with the lock again by setting the running flag to
- * FALSE. Together with the SIGNAL this is the sign for the _join() to
- * complete.
+ * FALSE. Together with the SIGNAL this is the sign for the _join() to
+ * complete.
* Note that we still have not dropped the final ref on the task. We could
* check here if there is a pending join() going on and drop the last ref
* before releasing the lock as we can be sure that a ref is held by the
}
/**
- * gst_task_create:
+ * gst_task_new:
* @func: The #GstTaskFunction to use
- * @data: User data to pass to @func
+ * @data: (closure): User data to pass to @func
*
* Create a new Task that will repeatedly call the provided @func
* with @data as a parameter. Typically the task will run in
* gst_task_set_lock() function. This lock will always be acquired while
* @func is called.
*
- * Returns: A new #GstTask.
+ * Returns: (transfer full): A new #GstTask.
*
* MT safe.
*/
GstTask *
-gst_task_create (GstTaskFunction func, gpointer data)
+gst_task_new (GstTaskFunction func, gpointer data)
{
GstTask *task;
GST_OBJECT_LOCK (task);
priv->prio_set = TRUE;
priv->priority = priority;
- thread = task->abidata.ABI.thread;
+ thread = task->thread;
if (thread != NULL) {
/* if this task already has a thread, we can configure the priority right
* away, else we do that when we assign a thread to the task. */
*
* MT safe.
*
- * Returns: the #GstTaskPool used by @task. gst_object_unref()
+ * Returns: (transfer full): the #GstTaskPool used by @task. gst_object_unref()
* after usage.
*
* Since: 0.10.24
/**
* gst_task_set_pool:
* @task: a #GstTask
- * @pool: a #GstTaskPool
+ * @pool: (transfer none): a #GstTaskPool
*
* Set @pool as the new GstTaskPool for @task. Any new streaming threads that
* will be created by @task will now use @pool.
/**
* gst_task_set_thread_callbacks:
* @task: The #GstTask to use
- * @callbacks: a #GstTaskThreadCallbacks pointer
- * @user_data: user data passed to the callbacks
+ * @callbacks: (in): a #GstTaskThreadCallbacks pointer
+ * @user_data: (closure): user data passed to the callbacks
* @notify: called when @user_data is no longer referenced
*
* Set callbacks which will be executed when a new thread is needed, the thread
g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
- GST_OBJECT_LOCK (task);
- result = task->state;
- GST_OBJECT_UNLOCK (task);
+ result = GET_TASK_STATE (task);
return result;
}
goto no_lock;
/* if the state changed, do our thing */
- old = task->state;
+ old = GET_TASK_STATE (task);
if (old != state) {
- task->state = state;
+ SET_TASK_STATE (task, state);
switch (old) {
case GST_TASK_STOPPED:
/* If the task already has a thread scheduled we don't have to do
* The task will automatically be stopped with this call.
*
* This function cannot be called from within a task function as this
- * would cause a deadlock. The function will detect this and print a
+ * would cause a deadlock. The function will detect this and print a
* g_warning.
*
* Returns: %TRUE if the task could be joined.
/* we don't use a real thread join here because we are using
* thread pools */
GST_OBJECT_LOCK (task);
- if (G_UNLIKELY (tself == task->abidata.ABI.thread))
+ if (G_UNLIKELY (tself == task->thread))
goto joining_self;
- task->state = GST_TASK_STOPPED;
+ SET_TASK_STATE (task, GST_TASK_STOPPED);
/* signal the state change for when it was blocked in PAUSED. */
GST_TASK_SIGNAL (task);
/* we set the running flag when pushing the task on the thread pool.
while (G_LIKELY (task->running))
GST_TASK_WAIT (task);
/* clean the thread */
- task->abidata.ABI.thread = NULL;
+ task->thread = NULL;
/* get the id and pool to join */
pool = priv->pool_id;
id = priv->id;