check/generic/states.c: Make sure all tasks are stopped.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 24 Aug 2005 20:49:53 +0000 (20:49 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 24 Aug 2005 20:49:53 +0000 (20:49 +0000)
Original commit message from CVS:
* check/generic/states.c: (GST_START_TEST):
Make sure all tasks are stopped.

* check/gst/gstbin.c: (GST_START_TEST):
Unref after usage for proper valgrinding.

* gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
Really wait for the task to stop before destroying the
mutex.

* gst/gstqueue.c: (gst_queue_sink_activate_push),
(gst_queue_src_activate_push):
Small cleanups. Don't stop the task when we did not start
it.

* gst/gsttask.c: (gst_task_get_type), (gst_task_init),
(gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
(gst_task_get_state), (gst_task_start), (gst_task_pause),
(gst_task_join):
* gst/gsttask.h:
Protect the stream lock with the object lock.
Disallow setting the stream lock when running.
Add cleanup_all to wait for the threadpool to finish.
Remove code to autoallocate a mutex if none was provided.
Add _join() to wait for a task to stop.
Protect the thread pool with a global lock.

ChangeLog
check/generic/states.c
check/gst/gstbin.c
gst/gstpad.c
gst/gstqueue.c
gst/gsttask.c
gst/gsttask.h
plugins/elements/gstqueue.c
tests/check/generic/states.c
tests/check/gst/gstbin.c

index e483ed8..98eed42 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,34 @@
 2005-08-24  Wim Taymans  <wim@fluendo.com>
 
+       * check/generic/states.c: (GST_START_TEST):
+       Make sure all tasks are stopped.
+
+       * check/gst/gstbin.c: (GST_START_TEST):
+       Unref after usage for proper valgrinding.
+
+       * gst/gstpad.c: (gst_pad_finalize), (gst_pad_stop_task):
+       Really wait for the task to stop before destroying the
+       mutex.
+
+       * gst/gstqueue.c: (gst_queue_sink_activate_push),
+       (gst_queue_src_activate_push):
+       Small cleanups. Don't stop the task when we did not start
+       it.
+
+       * gst/gsttask.c: (gst_task_get_type), (gst_task_init),
+       (gst_task_func), (gst_task_cleanup_all), (gst_task_set_lock),
+       (gst_task_get_state), (gst_task_start), (gst_task_pause),
+       (gst_task_join):
+       * gst/gsttask.h:
+       Protect the stream lock with the object lock.
+       Disallow setting the stream lock when running.
+       Add cleanup_all to wait for the threadpool to finish.
+       Remove code to autoallocate a mutex if none was provided.
+       Add _join() to wait for a task to stop.
+       Protect the thread pool with a global lock.
+
+2005-08-24  Wim Taymans  <wim@fluendo.com>
+
        * gst/base/gstbasesink.c: (gst_base_sink_handle_object),
        (gst_base_sink_get_times), (gst_base_sink_do_sync),
        (gst_base_sink_handle_buffer), (gst_base_sink_change_state):
index 3b9b961..a441686 100644 (file)
@@ -50,6 +50,8 @@ GST_START_TEST (test_state_changes)
     gst_element_set_state (element, GST_STATE_PAUSED);
     gst_element_set_state (element, GST_STATE_NULL);
     gst_object_unref (GST_OBJECT (element));
+
+    gst_task_cleanup_all ();
   }
 }
 
index f39f5c0..fffada3 100644 (file)
@@ -423,6 +423,10 @@ GST_START_TEST (test_add_linked)
   /* check if pads really are linked */
   fail_unless (gst_pad_is_linked (srcpad));
   fail_unless (gst_pad_is_linked (sinkpad));
+
+  gst_object_unref (srcpad);
+  gst_object_unref (sinkpad);
+  gst_object_unref (pipeline);
 }
 
 GST_END_TEST;
index d61418b..21ce573 100644 (file)
@@ -295,6 +295,14 @@ static void
 gst_pad_finalize (GObject * object)
 {
   GstPad *pad = GST_PAD (object);
+  GstTask *task;
+
+  /* in case the task is still around, clean it up */
+  if ((task = GST_PAD_TASK (pad))) {
+    gst_task_join (task);
+    GST_PAD_TASK (pad) = NULL;
+    gst_object_unref (task);
+  }
 
   if (pad->stream_rec_lock) {
     g_static_rec_mutex_free (pad->stream_rec_lock);
@@ -1878,8 +1886,8 @@ gst_pad_default_fixate (GQuark field_id, const GValue * value, gpointer data)
  * @pad: a  #GstPad to fixate
  * @caps: the  #GstCaps to fixate
  *
- * Fixate a caps on the given pad. Modifies the caps in place, so you should be
- * that the caps are actually writable (see gst_caps_make_writable()).
+ * Fixate a caps on the given pad. Modifies the caps in place, so you should
+ * make sure that the caps are actually writable (see gst_caps_make_writable()).
  */
 void
 gst_pad_fixate_caps (GstPad * pad, GstCaps * caps)
@@ -3759,7 +3767,11 @@ no_task:
  * @pad: the #GstPad to stop the task of
  *
  * Stop the task of @pad. This function will also make sure that the 
- * function executed by the task will effectively stop.
+ * function executed by the task will effectively stop if not called
+ * from the GstTaskFunction.
+ *
+ * This function will deadlock if called from the GstTaskFunction of
+ * the task. Use #gst_task_pause() instead.
  *
  * Returns: a TRUE if the task could be stopped or FALSE when the pad
  * has no task.
@@ -3782,6 +3794,8 @@ gst_pad_stop_task (GstPad * pad)
   GST_STREAM_LOCK (pad);
   GST_STREAM_UNLOCK (pad);
 
+  gst_task_join (task);
+
   gst_object_unref (task);
 
   return TRUE;
index 6286794..fe42421 100644 (file)
@@ -927,25 +927,27 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
 static gboolean
 gst_queue_sink_activate_push (GstPad * pad, gboolean active)
 {
-  gboolean result = FALSE;
+  gboolean result = TRUE;
   GstQueue *queue;
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_OK;
-    result = TRUE;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock chain function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
-    g_cond_signal (queue->item_del);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    /* step 2, make sure streaming finishes */
-    result = gst_pad_stop_task (pad);
+    /* and make sure the chain function finishes */
+    GST_STREAM_LOCK (pad);
+    GST_STREAM_UNLOCK (pad);
   }
+
   gst_object_unref (queue);
 
   return result;
@@ -971,9 +973,10 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
     }
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock loop function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
+    /* the item add signal will unblock */
     g_cond_signal (queue->item_add);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
index fec1f8e..23de33f 100644 (file)
@@ -25,6 +25,9 @@
 #include "gstinfo.h"
 #include "gsttask.h"
 
+GST_DEBUG_CATEGORY (task_debug);
+#define GST_CAT_DEFAULT (task_debug)
+
 static void gst_task_class_init (GstTaskClass * klass);
 static void gst_task_init (GstTask * task);
 static void gst_task_finalize (GObject * object);
@@ -33,6 +36,8 @@ static void gst_task_func (GstTask * task, GstTaskClass * tclass);
 
 static GstObjectClass *parent_class = NULL;
 
+static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
+
 GType
 gst_task_get_type (void)
 {
@@ -54,6 +59,8 @@ gst_task_get_type (void)
 
     _gst_task_type =
         g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
+
+    GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
   }
   return _gst_task_type;
 }
@@ -76,6 +83,7 @@ gst_task_class_init (GstTaskClass * klass)
 static void
 gst_task_init (GstTask * task)
 {
+  task->running = FALSE;
   task->lock = NULL;
   task->cond = g_cond_new ();
   task->state = GST_TASK_STOPPED;
@@ -97,16 +105,28 @@ gst_task_finalize (GObject * object)
 static void
 gst_task_func (GstTask * task, GstTaskClass * tclass)
 {
+  GStaticRecMutex *lock;
+
   GST_DEBUG ("Entering task %p, thread %p", task, g_thread_self ());
 
+  /* we have to grab the lock to get the mutex. We also
+   * mark our state running so that nobody can mess with
+   * the mutex. */
+  GST_LOCK (task);
+  if (task->state == GST_TASK_STOPPED)
+    goto exit;
+  lock = GST_TASK_GET_LOCK (task);
+  task->running = TRUE;
+  GST_UNLOCK (task);
+
   /* locking order is TASK_LOCK, LOCK */
-  GST_TASK_LOCK (task);
+  g_static_rec_mutex_lock (lock);
   GST_LOCK (task);
   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
       gint t;
 
-      t = GST_TASK_UNLOCK_FULL (task);
+      t = g_static_rec_mutex_unlock_full (lock);
       if (t <= 0) {
         g_warning ("wrong STREAM_LOCK count %d", t);
       }
@@ -115,7 +135,7 @@ gst_task_func (GstTask * task, GstTaskClass * tclass)
       GST_UNLOCK (task);
       /* locking order.. */
       if (t > 0)
-        GST_TASK_LOCK_FULL (task, t);
+        g_static_rec_mutex_lock_full (lock, t);
 
       GST_LOCK (task);
       if (task->state == GST_TASK_STOPPED)
@@ -129,7 +149,14 @@ gst_task_func (GstTask * task, GstTaskClass * tclass)
   }
 done:
   GST_UNLOCK (task);
-  GST_TASK_UNLOCK (task);
+  g_static_rec_mutex_unlock (lock);
+
+  /* now we allow messing with the lock again */
+  GST_LOCK (task);
+  task->running = FALSE;
+exit:
+  GST_TASK_SIGNAL (task);
+  GST_UNLOCK (task);
 
   GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
 
@@ -137,6 +164,35 @@ done:
 }
 
 /**
+ * gst_task_cleanup_all:
+ *
+ * Wait for all tasks to be stopped. This is mainly used internally
+ * to ensure proper cleanup of internal datastructures in testsuites.
+ *
+ * MT safe.
+ */
+void
+gst_task_cleanup_all (void)
+{
+  GstTaskClass *klass;
+
+  if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
+    g_static_mutex_lock (&pool_lock);
+    if (klass->pool) {
+      /* Shut down all the threads, we still process the ones scheduled
+       * because the unref happens in the thread function.
+       * Also wait for currently running ones to finish. */
+      g_thread_pool_free (klass->pool, FALSE, TRUE);
+      /* create new pool, so we can still do something after this
+       * call. */
+      klass->pool = g_thread_pool_new (
+          (GFunc) gst_task_func, klass, -1, FALSE, NULL);
+    }
+    g_static_mutex_unlock (&pool_lock);
+  }
+}
+
+/**
  * gst_task_create:
  * @func: The #GstTaskFunction to use
  * @data: User data to pass to @func
@@ -176,8 +232,19 @@ void
 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
 {
   GST_LOCK (task);
-  task->lock = mutex;
+  if (task->running)
+    goto is_running;
+  GST_TASK_GET_LOCK (task) = mutex;
   GST_UNLOCK (task);
+
+  return;
+
+  /* ERRORS */
+is_running:
+  {
+    g_warning ("cannot call set_lock on a running task");
+    GST_UNLOCK (task);
+  }
 }
 
 
@@ -218,39 +285,51 @@ gst_task_get_state (GstTask * task)
 gboolean
 gst_task_start (GstTask * task)
 {
-  GstTaskClass *tclass;
   GstTaskState old;
-  GStaticRecMutex *lock;
 
   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
 
-  tclass = GST_TASK_GET_CLASS (task);
-
   GST_DEBUG_OBJECT (task, "Starting task %p", task);
 
   GST_LOCK (task);
-  if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) {
-    lock = g_new (GStaticRecMutex, 1);
-    g_static_rec_mutex_init (lock);
-    GST_TASK_GET_LOCK (task) = lock;
-  }
+  if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
+    goto no_lock;
 
   old = task->state;
   task->state = GST_TASK_STARTED;
   switch (old) {
     case GST_TASK_STOPPED:
+    {
+      GstTaskClass *tclass;
+
+      tclass = GST_TASK_GET_CLASS (task);
+
+      /* new task, push on threadpool. We ref before so
+       * that it remains alive while on the threadpool. */
       gst_object_ref (task);
+      g_static_mutex_lock (&pool_lock);
       g_thread_pool_push (tclass->pool, task, NULL);
+      g_static_mutex_unlock (&pool_lock);
       break;
+    }
     case GST_TASK_PAUSED:
+      /* PAUSE to PLAY, signal */
       GST_TASK_SIGNAL (task);
       break;
     case GST_TASK_STARTED:
+      /* was OK */
       break;
   }
   GST_UNLOCK (task);
 
   return TRUE;
+
+  /* ERRORS */
+no_lock:
+  {
+    g_warning ("starting task without a lock");
+    return FALSE;
+  }
 }
 
 /**
@@ -305,13 +384,10 @@ gst_task_stop (GstTask * task)
 gboolean
 gst_task_pause (GstTask * task)
 {
-  GstTaskClass *tclass;
   GstTaskState old;
 
   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
 
-  tclass = GST_TASK_GET_CLASS (task);
-
   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
 
   GST_LOCK (task);
@@ -319,9 +395,17 @@ gst_task_pause (GstTask * task)
   task->state = GST_TASK_PAUSED;
   switch (old) {
     case GST_TASK_STOPPED:
+    {
+      GstTaskClass *tclass;
+
+      tclass = GST_TASK_GET_CLASS (task);
+
       gst_object_ref (task);
+      g_static_mutex_lock (&pool_lock);
       g_thread_pool_push (tclass->pool, task, NULL);
+      g_static_mutex_unlock (&pool_lock);
       break;
+    }
     case GST_TASK_PAUSED:
       break;
     case GST_TASK_STARTED:
@@ -331,3 +415,35 @@ gst_task_pause (GstTask * task)
 
   return TRUE;
 }
+
+/**
+ * gst_task_join:
+ * @task: The #GstTask to join
+ *
+ * Joins @task. After this call, it is safe to unref the task
+ * and clean up the lock set with #gst_task_set_lock().
+ *
+ * The task will automatically be stopped with this call.
+ *
+ * This function cannot be called from within a task function.
+ *
+ * Returns: TRUE if the task could be joined.
+ *
+ * MT safe.
+ */
+gboolean
+gst_task_join (GstTask * task)
+{
+  g_return_val_if_fail (GST_IS_TASK (task), FALSE);
+
+  GST_DEBUG_OBJECT (task, "Joining task %p", task);
+
+  GST_LOCK (task);
+  task->state = GST_TASK_STOPPED;
+  GST_TASK_SIGNAL (task);
+  while (task->running)
+    GST_TASK_WAIT (task);
+  GST_UNLOCK (task);
+
+  return TRUE;
+}
index c1210c3..33c26b0 100644 (file)
@@ -55,10 +55,6 @@ typedef enum {
 #define GST_TASK_BROADCAST(task)       g_cond_breadcast(GST_TASK_GET_COND (task))
 
 #define GST_TASK_GET_LOCK(task)                (GST_TASK_CAST(task)->lock)
-#define GST_TASK_LOCK(task)            g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
-#define GST_TASK_UNLOCK(task)          g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
-#define GST_TASK_UNLOCK_FULL(task)     g_static_rec_mutex_unlock_full(GST_TASK_GET_LOCK(task))
-#define GST_TASK_LOCK_FULL(task,t)     g_static_rec_mutex_lock_full(GST_TASK_GET_LOCK(task),(t))
 
 struct _GstTask {
   GstObject      object;
@@ -69,8 +65,10 @@ struct _GstTask {
 
   GStaticRecMutex *lock;
 
-  GstTaskFunction func;
-  gpointer      data;
+  GstTaskFunction  func;
+  gpointer        data;
+
+  gboolean        running;
 
   /*< private >*/
   gpointer _gst_reserved[GST_PADDING];
@@ -86,6 +84,8 @@ struct _GstTaskClass {
   gpointer _gst_reserved[GST_PADDING];
 };
 
+void           gst_task_cleanup_all    (void);
+
 GType           gst_task_get_type       (void);
 
 GstTask*       gst_task_create         (GstTaskFunction func, gpointer data);
@@ -97,6 +97,8 @@ gboolean      gst_task_start          (GstTask *task);
 gboolean       gst_task_stop           (GstTask *task);
 gboolean       gst_task_pause          (GstTask *task);
 
+gboolean       gst_task_join           (GstTask *task);
+
 G_END_DECLS
 
 #endif /* __GST_TASK_H__ */
index 6286794..fe42421 100644 (file)
@@ -927,25 +927,27 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
 static gboolean
 gst_queue_sink_activate_push (GstPad * pad, gboolean active)
 {
-  gboolean result = FALSE;
+  gboolean result = TRUE;
   GstQueue *queue;
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
+    GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_OK;
-    result = TRUE;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock chain function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
-    g_cond_signal (queue->item_del);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
-    /* step 2, make sure streaming finishes */
-    result = gst_pad_stop_task (pad);
+    /* and make sure the chain function finishes */
+    GST_STREAM_LOCK (pad);
+    GST_STREAM_UNLOCK (pad);
   }
+
   gst_object_unref (queue);
 
   return result;
@@ -971,9 +973,10 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
     }
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
-    /* step 1, unblock chain and loop functions */
+    /* step 1, unblock loop function */
     GST_QUEUE_MUTEX_LOCK (queue);
     queue->srcresult = GST_FLOW_WRONG_STATE;
+    /* the item add signal will unblock */
     g_cond_signal (queue->item_add);
     GST_QUEUE_MUTEX_UNLOCK (queue);
 
index 3b9b961..a441686 100644 (file)
@@ -50,6 +50,8 @@ GST_START_TEST (test_state_changes)
     gst_element_set_state (element, GST_STATE_PAUSED);
     gst_element_set_state (element, GST_STATE_NULL);
     gst_object_unref (GST_OBJECT (element));
+
+    gst_task_cleanup_all ();
   }
 }
 
index f39f5c0..fffada3 100644 (file)
@@ -423,6 +423,10 @@ GST_START_TEST (test_add_linked)
   /* check if pads really are linked */
   fail_unless (gst_pad_is_linked (srcpad));
   fail_unless (gst_pad_is_linked (sinkpad));
+
+  gst_object_unref (srcpad);
+  gst_object_unref (sinkpad);
+  gst_object_unref (pipeline);
 }
 
 GST_END_TEST;