TaskPool: remove _set_func()
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 24 Apr 2009 10:35:08 +0000 (12:35 +0200)
committerWim Taymans <wim@metal.(none)>
Mon, 11 May 2009 22:27:38 +0000 (00:27 +0200)
Remove the static function set on the TaskPool before _prepare() is called and
allow for assigning a function to a Task when we _push().
Update the examples

docs/gst/gstreamer-sections.txt
gst/gsttask.c
gst/gsttaskpool.c
gst/gsttaskpool.h
tests/examples/streams/.gitignore
tests/examples/streams/testrtpool.c
win32/common/libgstreamer.def

index 1fd6afd..7a265c2 100644 (file)
@@ -2173,8 +2173,8 @@ gst_tag_setter_get_type
 <TITLE>GstTaskPool</TITLE>
 GstTaskPool
 GstTaskPoolClass
+GstTaskPoolFunction
 gst_task_pool_new
-gst_task_pool_set_func
 gst_task_pool_prepare
 gst_task_pool_push
 gst_task_pool_join
index 3aec20f..dc361b5 100644 (file)
@@ -92,7 +92,7 @@ static void gst_task_class_init (GstTaskClass * klass);
 static void gst_task_init (GstTask * task);
 static void gst_task_finalize (GObject * object);
 
-static void gst_task_func (GstTask * task, GstTaskClass * tclass);
+static void gst_task_func (GstTask * task);
 
 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
 
@@ -112,7 +112,6 @@ init_klass_pool (GstTaskClass * klass)
     gst_object_unref (klass->pool);
   }
   klass->pool = gst_task_pool_new ();
-  gst_task_pool_set_func (klass->pool, (GFunc) gst_task_func, klass);
   gst_task_pool_prepare (klass->pool, NULL);
   g_static_mutex_unlock (&pool_lock);
 }
@@ -177,7 +176,7 @@ gst_task_finalize (GObject * object)
 }
 
 static void
-gst_task_func (GstTask * task, GstTaskClass * tclass)
+gst_task_func (GstTask * task)
 {
   GStaticRecMutex *lock;
   GThread *tself;
@@ -564,7 +563,9 @@ start_task (GstTask * task)
   /* push on the thread pool, we remember the original pool because the user
    * could change it later on and then we join to the wrong pool. */
   priv->pool_id = gst_object_ref (priv->pool);
-  priv->id = gst_task_pool_push (priv->pool_id, task, &error);
+  priv->id =
+      gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func,
+      task, &error);
 
   if (error != NULL) {
     g_warning ("failed to create thread: %s", error->message);
index 12e8f4a..e17af0b 100644 (file)
@@ -51,12 +51,30 @@ static void gst_task_pool_finalize (GObject * object);
 
 G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init);
 
+typedef struct
+{
+  GstTaskPoolFunction func;
+  gpointer user_data;
+} TaskData;
+
 static void
-default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data,
-    GError ** error)
+default_func (TaskData * tdata, GstTaskPool * pool)
+{
+  GstTaskPoolFunction func;
+  gpointer user_data;
+
+  func = tdata->func;
+  user_data = tdata->user_data;
+  g_slice_free (TaskData, tdata);
+
+  func (user_data);
+}
+
+static void
+default_prepare (GstTaskPool * pool, GError ** error)
 {
   GST_OBJECT_LOCK (pool);
-  pool->pool = g_thread_pool_new ((GFunc) func, user_data, -1, FALSE, NULL);
+  pool->pool = g_thread_pool_new ((GFunc) default_func, pool, -1, FALSE, NULL);
   GST_OBJECT_UNLOCK (pool);
 }
 
@@ -75,11 +93,21 @@ default_cleanup (GstTaskPool * pool)
 }
 
 static gpointer
-default_push (GstTaskPool * pool, gpointer data, GError ** error)
+default_push (GstTaskPool * pool, GstTaskPoolFunction func,
+    gpointer user_data, GError ** error)
 {
+  TaskData *tdata;
+
+  tdata = g_slice_new (TaskData);
+  tdata->func = func;
+  tdata->user_data = user_data;
+
   GST_OBJECT_LOCK (pool);
   if (pool->pool)
-    g_thread_pool_push (pool->pool, data, error);
+    g_thread_pool_push (pool->pool, tdata, error);
+  else {
+    g_slice_free (TaskData, tdata);
+  }
   GST_OBJECT_UNLOCK (pool);
 
   return NULL;
@@ -88,7 +116,7 @@ default_push (GstTaskPool * pool, gpointer data, GError ** error)
 static void
 default_join (GstTaskPool * pool, gpointer id)
 {
-  /* does nothing, we can't join for threads from the threadpool */
+  /* we do nothing here, we can't join from the pools */
 }
 
 static void
@@ -141,16 +169,6 @@ gst_task_pool_new (void)
   return pool;
 }
 
-void
-gst_task_pool_set_func (GstTaskPool * pool, GFunc func, gpointer user_data)
-{
-  g_return_if_fail (GST_IS_TASK_POOL (pool));
-
-  pool->func = func;
-  pool->user_data = user_data;
-}
-
-
 /**
  * gst_task_pool_prepare:
  * @pool: a #GstTaskPool
@@ -170,7 +188,7 @@ gst_task_pool_prepare (GstTaskPool * pool, GError ** error)
   klass = GST_TASK_POOL_GET_CLASS (pool);
 
   if (klass->prepare)
-    klass->prepare (pool, pool->func, pool->user_data, error);
+    klass->prepare (pool, error);
 }
 
 /**
@@ -198,7 +216,8 @@ gst_task_pool_cleanup (GstTaskPool * pool)
 /**
  * gst_task_pool_push:
  * @pool: a #GstTaskPool
- * @data: data to pass to the thread function
+ * @func: the function to call
+ * @user_data: data to pass to @func
  * @error: return location for an error
  *
  * Start the execution of a new thread from @pool.
@@ -208,7 +227,8 @@ gst_task_pool_cleanup (GstTaskPool * pool)
  * errors.
  */
 gpointer
-gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error)
+gst_task_pool_push (GstTaskPool * pool, GstTaskPoolFunction func,
+    gpointer user_data, GError ** error)
 {
   GstTaskPoolClass *klass;
 
@@ -219,7 +239,7 @@ gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error)
   if (klass->push == NULL)
     goto not_supported;
 
-  return klass->push (pool, data, error);
+  return klass->push (pool, func, user_data, error);
 
   /* ERRORS */
 not_supported:
index d9f922d..eda5c6c 100644 (file)
@@ -38,6 +38,8 @@ G_BEGIN_DECLS
 typedef struct _GstTaskPool GstTaskPool;
 typedef struct _GstTaskPoolClass GstTaskPoolClass;
 
+typedef void   (*GstTaskPoolFunction)          (void *data);
+
 /**
  * GstTaskPool:
  *
@@ -47,9 +49,6 @@ struct _GstTaskPool {
   GstObject      object;
 
   /*< private >*/
-  GFunc          func;
-  gpointer       user_data;
-
   GThreadPool   *pool;
 
   gpointer _gst_reserved[GST_PADDING];
@@ -69,11 +68,11 @@ struct _GstTaskPoolClass {
   GstObjectClass parent_class;
 
   /*< public >*/
-  void      (*prepare)  (GstTaskPool *pool, GFunc func,
-                         gpointer user_data, GError **error);
+  void      (*prepare)  (GstTaskPool *pool, GError **error);
   void      (*cleanup)  (GstTaskPool *pool);
 
-  gpointer  (*push)     (GstTaskPool *pool, gpointer data, GError **error);
+  gpointer  (*push)     (GstTaskPool *pool, GstTaskPoolFunction func,
+                         gpointer user_data, GError **error);
   void      (*join)     (GstTaskPool *pool, gpointer id);
 
   /*< private >*/
@@ -83,14 +82,10 @@ struct _GstTaskPoolClass {
 GType           gst_task_pool_get_type    (void);
 
 GstTaskPool *   gst_task_pool_new         (void);
-
-void            gst_task_pool_set_func    (GstTaskPool *pool,
-                                           GFunc func, gpointer user_data);
-
 void            gst_task_pool_prepare     (GstTaskPool *pool, GError **error);
 
-gpointer        gst_task_pool_push        (GstTaskPool *pool, gpointer data,
-                                           GError **error);
+gpointer        gst_task_pool_push        (GstTaskPool *pool, GstTaskPoolFunction func, 
+                                           gpointer user_data, GError **error);
 void            gst_task_pool_join        (GstTaskPool *pool, gpointer id);
 
 void           gst_task_pool_cleanup     (GstTaskPool *pool);
index 99261ec..d8cfe30 100644 (file)
@@ -1,4 +1,5 @@
 stream-status
+rtpool-test
 *.bb
 *.bbg
 *.da
index b010b1e..41cd4ed 100644 (file)
  * Boston, MA 02111-1307, USA.
  */
 
+#include <pthread.h>
+
 #include "testrtpool.h"
 
 static void test_rt_pool_class_init (TestRTPoolClass * klass);
 static void test_rt_pool_init (TestRTPool * pool);
 static void test_rt_pool_finalize (GObject * object);
 
+typedef struct
+{
+  pthread_t thread;
+} TestRTId;
+
 G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
 
 static void
-default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data,
-    GError ** error)
+default_prepare (GstTaskPool * pool, GError ** error)
 {
+  /* we don't do anything here. We could construct a pool of threads here that
+   * we could reuse later but we don't */
   g_message ("prepare Realtime pool %p", pool);
 }
 
@@ -39,19 +47,47 @@ default_cleanup (GstTaskPool * pool)
 }
 
 static gpointer
-default_push (GstTaskPool * pool, gpointer data, GError ** error)
+default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
+    GError ** error)
 {
-  g_message ("pushing Realtime pool %p", pool);
+  TestRTId *tid;
+  gint res;
+  pthread_attr_t attr;
+  //struct sched_param param;
+
+  g_message ("pushing Realtime pool %p, %p", pool, func);
 
-  *error = g_error_new (1, 1, "not supported");
+  tid = g_slice_new0 (TestRTId);
 
-  return NULL;
+  pthread_attr_init (&attr);
+  /* 
+     pthread_attr_setschedpolicy (&attr, SCHED_RR);
+     param.sched_priority = 50;
+     pthread_attr_setschedparam (&attr, &param);
+   */
+
+  res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
+
+  if (res < 0) {
+    g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
+        "Error creating thread: %s", g_strerror (res));
+    g_slice_free (TestRTId, tid);
+    tid = NULL;
+  }
+
+  return tid;
 }
 
 static void
 default_join (GstTaskPool * pool, gpointer id)
 {
+  TestRTId *tid = (TestRTId *) id;
+
   g_message ("joining Realtime pool %p", pool);
+
+  pthread_join (tid->thread, NULL);
+
+  g_slice_free (TestRTId, tid);
 }
 
 static void
index 5b4225a..262b52a 100644 (file)
@@ -950,7 +950,6 @@ EXPORTS
        gst_task_pool_new
        gst_task_pool_prepare
        gst_task_pool_push
-       gst_task_pool_set_func
        gst_task_set_lock
        gst_task_set_pool
        gst_task_set_priority