pad: make an ACCEPT_CAPS query
[platform/upstream/gstreamer.git] / gst / gsttask.c
index 5ab1c03..b904fc9 100644 (file)
@@ -34,7 +34,7 @@
  * is typically done when the demuxer can perform random access on the upstream
  * peer element for improved performance.
  *
- * Although convenience functions exist on #GstPad to start/pause/stop tasks, it 
+ * Although convenience functions exist on #GstPad to start/pause/stop tasks, it
  * might sometimes be needed to create a #GstTask manually if it is not related to
  * a #GstPad.
  *
@@ -45,7 +45,7 @@
  * and gst_task_stop() respectively or with the gst_task_set_state() function.
  *
  * A #GstTask will repeatedly call the #GstTaskFunction with the user data
- * that was provided when creating the task with gst_task_create(). While calling
+ * that was provided when creating the task with gst_task_new(). While calling
  * the function it will acquire the provided lock. The provided lock is released
  * when the task pauses or stops.
  *
  * After creating a #GstTask, use gst_object_unref() to free its resources. This can
  * only be done it the task is not running anymore.
  *
- * Last reviewed on 2006-02-13 (0.10.4)
+ * Task functions can send a #GstMessage to send out-of-band data to the
+ * application. The application can receive messages from the #GstBus in its
+ * mainloop.
+ *
+ * For debugging perposes, the task will configure its object name as the thread
+ * name on Linux. Please note that the object name should be configured before the
+ * task is started; changing the object name after the task has been started, has
+ * no effect on the thread name.
+ *
+ * Last reviewed on 2010-03-15 (0.10.29)
  */
 
 #include "gst_private.h"
 #include "gstinfo.h"
 #include "gsttask.h"
 
+#include <stdio.h>
+
+#ifdef HAVE_SYS_PRCTL_H
+#include <sys/prctl.h>
+#endif
+
 GST_DEBUG_CATEGORY_STATIC (task_debug);
 #define GST_CAT_DEFAULT (task_debug)
 
+#define SET_TASK_STATE(t,s) (g_atomic_int_set (&GST_TASK_STATE(t), (s)))
+#define GET_TASK_STATE(t)   ((GstTaskState) g_atomic_int_get (&GST_TASK_STATE(t)))
+
 #define GST_TASK_GET_PRIVATE(obj)  \
    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate))
 
@@ -88,8 +106,36 @@ struct _GstTaskPrivate
   GstTaskPool *pool_id;
 };
 
-static void gst_task_class_init (GstTaskClass * klass);
-static void gst_task_init (GstTask * task);
+#ifdef _MSC_VER
+#include <windows.h>
+
+struct _THREADNAME_INFO
+{
+  DWORD dwType;                 // must be 0x1000
+  LPCSTR szName;                // pointer to name (in user addr space)
+  DWORD dwThreadID;             // thread ID (-1=caller thread)
+  DWORD dwFlags;                // reserved for future use, must be zero
+};
+typedef struct _THREADNAME_INFO THREADNAME_INFO;
+
+void
+SetThreadName (DWORD dwThreadID, LPCSTR szThreadName)
+{
+  THREADNAME_INFO info;
+  info.dwType = 0x1000;
+  info.szName = szThreadName;
+  info.dwThreadID = dwThreadID;
+  info.dwFlags = 0;
+
+  __try {
+    RaiseException (0x406D1388, 0, sizeof (info) / sizeof (DWORD),
+        (DWORD *) & info);
+  }
+  __except (EXCEPTION_CONTINUE_EXECUTION) {
+  }
+}
+#endif
+
 static void gst_task_finalize (GObject * object);
 
 static void gst_task_func (GstTask * task);
@@ -125,7 +171,7 @@ gst_task_class_init (GstTaskClass * klass)
 
   g_type_class_add_private (klass, sizeof (GstTaskPrivate));
 
-  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
+  gobject_class->finalize = gst_task_finalize;
 
   init_klass_pool (klass);
 }
@@ -139,10 +185,10 @@ gst_task_init (GstTask * task)
 
   task->priv = GST_TASK_GET_PRIVATE (task);
   task->running = FALSE;
-  task->abidata.ABI.thread = NULL;
+  task->thread = NULL;
   task->lock = NULL;
   task->cond = g_cond_new ();
-  task->state = GST_TASK_STOPPED;
+  SET_TASK_STATE (task, GST_TASK_STOPPED);
   task->priv->prio_set = FALSE;
 
   /* use the default klass pool for this task, users can
@@ -175,6 +221,37 @@ gst_task_finalize (GObject * object)
   G_OBJECT_CLASS (gst_task_parent_class)->finalize (object);
 }
 
+/* should be called with the object LOCK */
+static void
+gst_task_configure_name (GstTask * task)
+{
+#if defined(HAVE_SYS_PRCTL_H) && defined(PR_SET_NAME)
+  const gchar *name;
+  gchar thread_name[17] = { 0, };
+
+  GST_OBJECT_LOCK (task);
+  name = GST_OBJECT_NAME (task);
+
+  /* set the thread name to something easily identifiable */
+  if (!snprintf (thread_name, 17, "%s", GST_STR_NULL (name))) {
+    GST_DEBUG_OBJECT (task, "Could not create thread name for '%s'", name);
+  } else {
+    GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", thread_name);
+    if (prctl (PR_SET_NAME, (unsigned long int) thread_name, 0, 0, 0))
+      GST_DEBUG_OBJECT (task, "Failed to set thread name");
+  }
+  GST_OBJECT_UNLOCK (task);
+#endif
+#ifdef _MSC_VER
+  const gchar *name;
+  name = GST_OBJECT_NAME (task);
+
+  /* set the thread name to something easily identifiable */
+  GST_DEBUG_OBJECT (task, "Setting thread name to '%s'", name);
+  SetThreadName (-1, name);
+#endif
+}
+
 static void
 gst_task_func (GstTask * task)
 {
@@ -192,12 +269,12 @@ gst_task_func (GstTask * task)
    * mark our state running so that nobody can mess with
    * the mutex. */
   GST_OBJECT_LOCK (task);
-  if (task->state == GST_TASK_STOPPED)
+  if (GET_TASK_STATE (task) == GST_TASK_STOPPED)
     goto exit;
   lock = GST_TASK_GET_LOCK (task);
   if (G_UNLIKELY (lock == NULL))
     goto no_lock;
-  task->abidata.ABI.thread = tself;
+  task->thread = tself;
   /* only update the priority when it was changed */
   if (priv->prio_set)
     g_thread_set_priority (tself, priv->priority);
@@ -209,38 +286,37 @@ gst_task_func (GstTask * task)
 
   /* locking order is TASK_LOCK, LOCK */
   g_static_rec_mutex_lock (lock);
-  GST_OBJECT_LOCK (task);
-  while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
-    while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
-      gint t;
+  /* configure the thread name now */
+  gst_task_configure_name (task);
 
-      t = g_static_rec_mutex_unlock_full (lock);
-      if (t <= 0) {
-        g_warning ("wrong STREAM_LOCK count %d", t);
+  while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
+    if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_PAUSED)) {
+      GST_OBJECT_LOCK (task);
+      while (G_UNLIKELY (GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
+        g_static_rec_mutex_unlock (lock);
+
+        GST_TASK_SIGNAL (task);
+        GST_TASK_WAIT (task);
+        GST_OBJECT_UNLOCK (task);
+        /* locking order.. */
+        g_static_rec_mutex_lock (lock);
+
+        GST_OBJECT_LOCK (task);
+        if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) {
+          GST_OBJECT_UNLOCK (task);
+          goto done;
+        }
       }
-      GST_TASK_SIGNAL (task);
-      GST_TASK_WAIT (task);
       GST_OBJECT_UNLOCK (task);
-      /* locking order.. */
-      if (t > 0)
-        g_static_rec_mutex_lock_full (lock, t);
-
-      GST_OBJECT_LOCK (task);
-      if (G_UNLIKELY (task->state == GST_TASK_STOPPED))
-        goto done;
     }
-    GST_OBJECT_UNLOCK (task);
 
     task->func (task->data);
-
-    GST_OBJECT_LOCK (task);
   }
 done:
-  GST_OBJECT_UNLOCK (task);
   g_static_rec_mutex_unlock (lock);
 
   GST_OBJECT_LOCK (task);
-  task->abidata.ABI.thread = NULL;
+  task->thread = NULL;
 
 exit:
   if (priv->thr_callbacks.leave_thread) {
@@ -255,8 +331,8 @@ exit:
     g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL);
   }
   /* now we allow messing with the lock again by setting the running flag to
-   * FALSE. Together with the SIGNAL this is the sign for the _join() to 
-   * complete. 
+   * FALSE. Together with the SIGNAL this is the sign for the _join() to
+   * complete.
    * Note that we still have not dropped the final ref on the task. We could
    * check here if there is a pending join() going on and drop the last ref
    * before releasing the lock as we can be sure that a ref is held by the
@@ -296,9 +372,9 @@ gst_task_cleanup_all (void)
 }
 
 /**
- * gst_task_create:
+ * gst_task_new:
  * @func: The #GstTaskFunction to use
- * @data: User data to pass to @func
+ * @data: (closure): User data to pass to @func
  *
  * Create a new Task that will repeatedly call the provided @func
  * with @data as a parameter. Typically the task will run in
@@ -314,16 +390,16 @@ gst_task_cleanup_all (void)
  * gst_task_set_lock() function. This lock will always be acquired while
  * @func is called.
  *
- * Returns: A new #GstTask.
+ * Returns: (transfer full): A new #GstTask.
  *
  * MT safe.
  */
 GstTask *
-gst_task_create (GstTaskFunction func, gpointer data)
+gst_task_new (GstTaskFunction func, gpointer data)
 {
   GstTask *task;
 
-  task = g_object_new (GST_TYPE_TASK, NULL);
+  task = g_object_newv (GST_TYPE_TASK, 0, NULL);
   task->func = func;
   task->data = data;
 
@@ -390,7 +466,7 @@ gst_task_set_priority (GstTask * task, GThreadPriority priority)
   GST_OBJECT_LOCK (task);
   priv->prio_set = TRUE;
   priv->priority = priority;
-  thread = task->abidata.ABI.thread;
+  thread = task->thread;
   if (thread != NULL) {
     /* if this task already has a thread, we can configure the priority right
      * away, else we do that when we assign a thread to the task. */
@@ -408,7 +484,7 @@ gst_task_set_priority (GstTask * task, GThreadPriority priority)
  *
  * MT safe.
  *
- * Returns: the #GstTaskPool used by @task. gst_object_unref()
+ * Returns: (transfer full): the #GstTaskPool used by @task. gst_object_unref()
  * after usage.
  *
  * Since: 0.10.24
@@ -433,7 +509,7 @@ gst_task_get_pool (GstTask * task)
 /**
  * gst_task_set_pool:
  * @task: a #GstTask
- * @pool: a #GstTaskPool
+ * @pool: (transfer none): a #GstTaskPool
  *
  * Set @pool as the new GstTaskPool for @task. Any new streaming threads that
  * will be created by @task will now use @pool.
@@ -469,8 +545,8 @@ gst_task_set_pool (GstTask * task, GstTaskPool * pool)
 /**
  * gst_task_set_thread_callbacks:
  * @task: The #GstTask to use
- * @callbacks: a #GstTaskThreadCallbacks pointer
- * @user_data: user data passed to the callbacks
+ * @callbacks: (in): a #GstTaskThreadCallbacks pointer
+ * @user_data: (closure): user data passed to the callbacks
  * @notify: called when @user_data is no longer referenced
  *
  * Set callbacks which will be executed when a new thread is needed, the thread
@@ -535,9 +611,7 @@ gst_task_get_state (GstTask * task)
 
   g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
 
-  GST_OBJECT_LOCK (task);
-  result = task->state;
-  GST_OBJECT_UNLOCK (task);
+  result = GET_TASK_STATE (task);
 
   return result;
 }
@@ -548,7 +622,6 @@ static gboolean
 start_task (GstTask * task)
 {
   gboolean res = TRUE;
-  GstTaskClass *tclass;
   GError *error = NULL;
   GstTaskPrivate *priv;
 
@@ -561,8 +634,6 @@ start_task (GstTask * task)
    * and exit the task function. */
   task->running = TRUE;
 
-  tclass = GST_TASK_GET_CLASS (task);
-
   /* push on the thread pool, we remember the original pool because the user
    * could change it later on and then we join to the wrong pool. */
   priv->pool_id = gst_object_ref (priv->pool);
@@ -612,9 +683,9 @@ gst_task_set_state (GstTask * task, GstTaskState state)
       goto no_lock;
 
   /* if the state changed, do our thing */
-  old = task->state;
+  old = GET_TASK_STATE (task);
   if (old != state) {
-    task->state = state;
+    SET_TASK_STATE (task, state);
     switch (old) {
       case GST_TASK_STOPPED:
         /* If the task already has a thread scheduled we don't have to do
@@ -710,7 +781,7 @@ gst_task_pause (GstTask * task)
  * The task will automatically be stopped with this call.
  *
  * This function cannot be called from within a task function as this
- * would cause a deadlock. The function will detect this and print a 
+ * would cause a deadlock. The function will detect this and print a
  * g_warning.
  *
  * Returns: %TRUE if the task could be joined.
@@ -736,9 +807,9 @@ gst_task_join (GstTask * task)
   /* we don't use a real thread join here because we are using
    * thread pools */
   GST_OBJECT_LOCK (task);
-  if (G_UNLIKELY (tself == task->abidata.ABI.thread))
+  if (G_UNLIKELY (tself == task->thread))
     goto joining_self;
-  task->state = GST_TASK_STOPPED;
+  SET_TASK_STATE (task, GST_TASK_STOPPED);
   /* signal the state change for when it was blocked in PAUSED. */
   GST_TASK_SIGNAL (task);
   /* we set the running flag when pushing the task on the thread pool.
@@ -747,7 +818,7 @@ gst_task_join (GstTask * task)
   while (G_LIKELY (task->running))
     GST_TASK_WAIT (task);
   /* clean the thread */
-  task->abidata.ABI.thread = NULL;
+  task->thread = NULL;
   /* get the id and pool to join */
   pool = priv->pool_id;
   id = priv->id;