Refine the cl thread implement for queue.
authorJunyan He <junyan.he@linux.intel.com>
Fri, 30 May 2014 06:28:30 +0000 (14:28 +0800)
committerZhigang Gong <zhigang.gong@intel.com>
Fri, 30 May 2014 06:55:22 +0000 (14:55 +0800)
Because the cl_command_queue can be used in several threads simultaneously but
without add ref to it, we now handle it like this:
Keep one threads_slot_array, every time the thread get gpgpu or batch buffer, if it
does not have a slot, assign it.
The resources are keeped in queue private, and resize it if needed.
When the thread exit, the slot will be set invalid.
When queue released, all the resources will be released. If user still enqueue, flush
or finish the queue after it has been released, the behavior is undefined.
TODO: Need to shrink the slot map.

Signed-off-by: Junyan He <junyan.he@linux.intel.com>
Reviewed-by: Zhigang Gong <zhigang.gong@linux.intel.com>
src/cl_command_queue.c
src/cl_command_queue_gen7.c
src/cl_context.c
src/cl_device_id.c
src/cl_thread.c
src/cl_thread.h

index 6a699c0..a2109d7 100644 (file)
@@ -89,7 +89,7 @@ cl_command_queue_delete(cl_command_queue queue)
     queue->fulsim_out = NULL;
   }
 
-  cl_thread_data_destroy(queue->thread_data);
+  cl_thread_data_destroy(queue);
   queue->thread_data = NULL;
   cl_mem_delete(queue->perf);
   cl_context_delete(queue->ctx);
@@ -430,7 +430,7 @@ cl_command_queue_flush(cl_command_queue queue)
 LOCAL cl_int
 cl_command_queue_finish(cl_command_queue queue)
 {
-  cl_gpgpu_sync(cl_get_thread_batch_buf());
+  cl_gpgpu_sync(cl_get_thread_batch_buf(queue));
   return CL_SUCCESS;
 }
 
index 891d6f1..d875021 100644 (file)
@@ -327,7 +327,7 @@ cl_command_queue_ND_range_gen7(cl_command_queue queue,
   /* Start a new batch buffer */
   batch_sz = cl_kernel_compute_batch_sz(ker);
   cl_gpgpu_batch_reset(gpgpu, batch_sz);
-  cl_set_thread_batch_buf(cl_gpgpu_ref_batch_buf(gpgpu));
+  cl_set_thread_batch_buf(queue, cl_gpgpu_ref_batch_buf(gpgpu));
   cl_gpgpu_batch_start(gpgpu);
 
   /* Issue the GPGPU_WALKER command */
index 8190e6a..1911bf2 100644 (file)
@@ -203,7 +203,6 @@ cl_context_delete(cl_context ctx)
   assert(ctx->buffers == NULL);
   assert(ctx->drv);
   cl_free(ctx->prop_user);
-  cl_set_thread_batch_buf(NULL);
   cl_driver_delete(ctx->drv);
   ctx->magic = CL_MAGIC_DEAD_HEADER; /* For safety */
   cl_free(ctx);
index 427c50e..d2b3bed 100644 (file)
@@ -107,7 +107,6 @@ LOCAL cl_device_id
 cl_get_gt_device(void)
 {
   cl_device_id ret = NULL;
-  cl_set_thread_batch_buf(NULL);
   const int device_id = cl_driver_get_device_id();
   cl_device_id device = NULL;
 
index cadc3cd..4692ed7 100644 (file)
  * License along with this library. If not, see <http://www.gnu.org/licenses/>.
  *
  */
+#include <string.h>
+#include <stdio.h>
 
 #include "cl_thread.h"
 #include "cl_alloc.h"
 #include "cl_utils.h"
 
-static __thread void* thread_batch_buf = NULL;
-
-typedef struct _cl_thread_spec_data {
+/* Because the cl_command_queue can be used in several threads simultaneously but
+   without add ref to it, we now handle it like this:
+   Keep one threads_slot_array, every time the thread get gpgpu or batch buffer, if it
+   does not have a slot, assign it.
+   The resources are keeped in queue private, and resize it if needed.
+   When the thread exit, the slot will be set invalid.
+   When queue released, all the resources will be released. If user still enqueue, flush
+   or finish the queue after it has been released, the behavior is undefined.
+   TODO: Need to shrink the slot map.
+   */
+
+static int thread_array_num = 1;
+static int *thread_slot_map = NULL;
+static int thread_magic_num = 1;
+static pthread_mutex_t thread_queue_map_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_key_t destroy_key;
+
+static __thread int thread_id = -1;
+static __thread int thread_magic = -1;
+
+typedef struct _thread_spec_data {
   cl_gpgpu gpgpu ;
   int valid;
-}cl_thread_spec_data;
+  void* thread_batch_buf;
+  int thread_magic;
+} thread_spec_data;
+
+typedef struct _queue_thread_private {
+  thread_spec_data**  threads_data;
+  int threads_data_num;
+  pthread_mutex_t thread_data_lock;
+} queue_thread_private;
+
+static void thread_data_destructor(void *dummy) {
+  pthread_mutex_lock(&thread_queue_map_lock);
+  thread_slot_map[thread_id] = 0;
+  pthread_mutex_unlock(&thread_queue_map_lock);
+  free(dummy);
+}
+
+static thread_spec_data * __create_thread_spec_data(cl_command_queue queue, int create)
+{
+  queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
+  thread_spec_data* spec = NULL;
+  int i = 0;
+
+  if (thread_id == -1) {
+    void * dummy = malloc(sizeof(int));
+
+    pthread_mutex_lock(&thread_queue_map_lock);
+    for (i = 0; i < thread_array_num; i++) {
+      if (thread_slot_map[i] == 0) {
+        thread_id = i;
+        break;
+      }
+    }
+
+    if (i == thread_array_num) {
+      thread_array_num *= 2;
+      thread_slot_map = realloc(thread_slot_map, sizeof(int) * thread_array_num);
+      memset(thread_slot_map + thread_array_num/2, 0, sizeof(int) * (thread_array_num/2));
+      thread_id = thread_array_num/2;
+    }
+
+    thread_slot_map[thread_id] = 1;
+
+    thread_magic = thread_magic_num++;
+    pthread_mutex_unlock(&thread_queue_map_lock);
 
-void cl_set_thread_batch_buf(void* buf) {
-  if (thread_batch_buf) {
-    cl_gpgpu_unref_batch_buf(thread_batch_buf);
+    pthread_setspecific(destroy_key, dummy);
+  }
+
+  pthread_mutex_lock(&thread_private->thread_data_lock);
+  if (thread_array_num > thread_private->threads_data_num) {// just enlarge
+    int old_num = thread_private->threads_data_num;
+    thread_private->threads_data_num = thread_array_num;
+    thread_private->threads_data = realloc(thread_private->threads_data,
+                thread_private->threads_data_num * sizeof(void *));
+    memset(thread_private->threads_data + old_num, 0,
+           sizeof(void*) * (thread_private->threads_data_num - old_num));
   }
-  thread_batch_buf = buf;
-}
 
-void* cl_get_thread_batch_buf(void) {
-  return thread_batch_buf;
+  assert(thread_id != -1 && thread_id < thread_array_num);
+  spec = thread_private->threads_data[thread_id];
+  if (!spec && create) {
+       spec = CALLOC(thread_spec_data);
+       spec->thread_magic = thread_magic;
+       thread_private->threads_data[thread_id] = spec;
+  }
+
+  pthread_mutex_unlock(&thread_private->thread_data_lock);
+
+  return spec;
 }
 
-cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue)
+void* cl_thread_data_create(void)
 {
-  pthread_key_t* key = queue->thread_data;
-  cl_thread_spec_data* thread_spec_data = pthread_getspecific(*key);
-
-  if (!thread_spec_data) {
-    TRY_ALLOC_NO_ERR(thread_spec_data, CALLOC(struct _cl_thread_spec_data));
-    if (pthread_setspecific(*key, thread_spec_data)) {
-      cl_free(thread_spec_data);
-      return NULL;
-    }
-  }
+  queue_thread_private* thread_private = CALLOC(queue_thread_private);
+
+  if (thread_private == NULL)
+    return NULL;
 
-  if (!thread_spec_data->valid) {
-    TRY_ALLOC_NO_ERR(thread_spec_data->gpgpu, cl_gpgpu_new(queue->ctx->drv));
-    thread_spec_data->valid = 1;
+  if (thread_slot_map == NULL) {
+    pthread_mutex_lock(&thread_queue_map_lock);
+    thread_slot_map = calloc(thread_array_num, sizeof(int));
+    pthread_mutex_unlock(&thread_queue_map_lock);
+
+    pthread_key_create(&destroy_key, thread_data_destructor);
   }
 
-error:
-  return thread_spec_data->gpgpu;
+  pthread_mutex_init(&thread_private->thread_data_lock, NULL);
+
+  pthread_mutex_lock(&thread_private->thread_data_lock);
+  thread_private->threads_data = malloc(thread_array_num * sizeof(void *));
+  memset(thread_private->threads_data, 0, sizeof(void*) * thread_array_num);
+  thread_private->threads_data_num = thread_array_num;
+  pthread_mutex_unlock(&thread_private->thread_data_lock);
+
+  return thread_private;
 }
 
-void cl_invalid_thread_gpgpu(cl_command_queue queue)
+cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue)
 {
-  pthread_key_t* key = queue->thread_data;
-  cl_thread_spec_data* thread_spec_data = pthread_getspecific(*key);
+  thread_spec_data* spec = __create_thread_spec_data(queue, 1);
 
-  if (!thread_spec_data) {
-    return;
+  if (!spec->thread_magic && spec->thread_magic != thread_magic) {
+    //We may get the slot from last thread. So free the resource.
+    spec->valid = 0;
   }
 
-  if (!thread_spec_data->valid) {
-    return;
+  if (!spec->valid) {
+    if (spec->thread_batch_buf) {
+      cl_gpgpu_unref_batch_buf(spec->thread_batch_buf);
+      spec->thread_batch_buf = NULL;
+    }
+    if (spec->gpgpu) {
+      cl_gpgpu_delete(spec->gpgpu);
+      spec->gpgpu = NULL;
+    }
+    TRY_ALLOC_NO_ERR(spec->gpgpu, cl_gpgpu_new(queue->ctx->drv));
+    spec->valid = 1;
   }
 
-  assert(thread_spec_data->gpgpu);
-  cl_gpgpu_delete(thread_spec_data->gpgpu);
-  thread_spec_data->valid = 0;
+ error:
+  return spec->gpgpu;
 }
 
-static void thread_data_destructor(void *data) {
-  cl_thread_spec_data* thread_spec_data = (cl_thread_spec_data *)data;
+void cl_set_thread_batch_buf(cl_command_queue queue, void* buf)
+{
+  thread_spec_data* spec = __create_thread_spec_data(queue, 1);
+
+  assert(spec && spec->thread_magic == thread_magic);
 
-  if (thread_batch_buf) {
-    cl_gpgpu_unref_batch_buf(thread_batch_buf);
-    thread_batch_buf = NULL;
+  if (spec->thread_batch_buf) {
+    cl_gpgpu_unref_batch_buf(spec->thread_batch_buf);
   }
+  spec->thread_batch_buf = buf;
+}
+
+void* cl_get_thread_batch_buf(cl_command_queue queue) {
+  thread_spec_data* spec = __create_thread_spec_data(queue, 1);
 
-  if (thread_spec_data->valid)
-    cl_gpgpu_delete(thread_spec_data->gpgpu);
-  cl_free(thread_spec_data);
+  assert(spec && spec->thread_magic == thread_magic);
+
+  return spec->thread_batch_buf;
 }
 
-/* Create the thread specific data. */
-void* cl_thread_data_create(void)
+void cl_invalid_thread_gpgpu(cl_command_queue queue)
 {
-  int rc = 0;
+  queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
+  thread_spec_data* spec = NULL;
 
-  pthread_key_t *thread_specific_key = CALLOC(pthread_key_t);
-  if (thread_specific_key == NULL)
-    return NULL;
-
-  rc = pthread_key_create(thread_specific_key, thread_data_destructor);
+  pthread_mutex_lock(&thread_private->thread_data_lock);
+  spec = thread_private->threads_data[thread_id];
+  assert(spec);
+  pthread_mutex_unlock(&thread_private->thread_data_lock);
 
-  if (rc != 0)
-    return NULL;
+  if (!spec->valid) {
+    return;
+  }
 
-  return thread_specific_key;
+  assert(spec->gpgpu);
+  cl_gpgpu_delete(spec->gpgpu);
+  spec->gpgpu = NULL;
+  spec->valid = 0;
 }
 
 /* The destructor for clean the thread specific data. */
-void cl_thread_data_destroy(void * data)
+void cl_thread_data_destroy(cl_command_queue queue)
 {
-  pthread_key_t *thread_specific_key = (pthread_key_t *)data;
-
-  /* First release self spec data. */
-  cl_thread_spec_data* thread_spec_data =
-         pthread_getspecific(*thread_specific_key);
-  if (thread_spec_data && thread_spec_data->valid) {
-    cl_gpgpu_delete(thread_spec_data->gpgpu);
-    if (thread_spec_data)
-      cl_free(thread_spec_data);
+  int i = 0;
+  queue_thread_private *thread_private = ((queue_thread_private *)(queue->thread_data));
+  int threads_data_num;
+  thread_spec_data** threads_data;
+
+  pthread_mutex_lock(&thread_private->thread_data_lock);
+  assert(thread_private->threads_data_num == thread_array_num);
+  threads_data_num = thread_private->threads_data_num;
+  threads_data = thread_private->threads_data;
+  thread_private->threads_data_num = 0;
+  thread_private->threads_data = NULL;
+  pthread_mutex_unlock(&thread_private->thread_data_lock);
+  cl_free(thread_private);
+  queue->thread_data = NULL;
+
+  for (i = 0; i < threads_data_num; i++) {
+    if (threads_data[i] != NULL && threads_data[i]->thread_batch_buf) {
+      cl_gpgpu_unref_batch_buf(threads_data[i]->thread_batch_buf);
+      threads_data[i]->thread_batch_buf = NULL;
+    }
+
+    if (threads_data[i] != NULL && threads_data[i]->valid) {
+      cl_gpgpu_delete(threads_data[i]->gpgpu);
+      threads_data[i]->gpgpu = NULL;
+      threads_data[i]->valid = 0;
+    }
+    cl_free(threads_data[i]);
   }
 
-  pthread_key_delete(*thread_specific_key);
-  cl_free(thread_specific_key);
+  cl_free(threads_data);
 }
index c8ab63c..bf855a2 100644 (file)
@@ -27,7 +27,7 @@
 void* cl_thread_data_create(void);
 
 /* The destructor for clean the thread specific data. */
-void cl_thread_data_destroy(void * data);
+void cl_thread_data_destroy(cl_command_queue queue);
 
 /* Used to get the gpgpu struct of each thread. */
 cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue);
@@ -36,9 +36,9 @@ cl_gpgpu cl_get_thread_gpgpu(cl_command_queue queue);
 void cl_invalid_thread_gpgpu(cl_command_queue queue);
 
 /* Used to set the batch buffer of each thread. */
-void cl_set_thread_batch_buf(void* buf);
+void cl_set_thread_batch_buf(cl_command_queue queue, void* buf);
 
 /* Used to get the batch buffer of each thread. */
-void* cl_get_thread_batch_buf(void);
+void* cl_get_thread_batch_buf(cl_command_queue queue);
 
 #endif /* __CL_THREAD_H__ */