Fix race condition in blas_server_omp.c
authorZhiyong Dang <zhiyong.dang@ck-telecom.com>
Tue, 24 Apr 2018 02:34:53 +0000 (10:34 +0800)
committerZhiyong Dang <zhiyong.dang@ck-telecom.com>
Fri, 27 Apr 2018 09:00:42 +0000 (17:00 +0800)
Change-Id: Ic896276cd073d6b41930c7c5a29d66348cd1725d

Makefile.rule
Makefile.system
cmake/system.cmake
common.h
driver/others/blas_server_omp.c

index 62bf63d..0ce4c40 100644 (file)
@@ -60,6 +60,13 @@ VERSION = 0.3.0.dev
 # automatically detected by the the script.
 # NUM_THREADS = 24
 
+# If you have enabled USE_OPENMP and your application would call
+# OpenBLAS's caculation API in multi threads, please comment it in.
+# This flag define how many OpenBLAS's caculation API can actually
+# run in parallel. If more number threads call OpenBLAS's caculation API,
+# it would wait former API finish.
+# NUM_PARALLEL = 2
+
 # if you don't need to install the static library, please comment it in.
 # NO_STATIC = 1
 
index 142cb42..463b857 100644 (file)
@@ -184,6 +184,10 @@ endif
 
 endif
 
+ifndef NUM_PARALLEL
+NUM_PARALLEL = 1
+endif
+
 ifndef NUM_THREADS
 NUM_THREADS = $(NUM_CORES)
 endif
@@ -961,6 +965,8 @@ endif
 
 CCOMMON_OPT    += -DMAX_CPU_NUMBER=$(NUM_THREADS)
 
+CCOMMON_OPT    += -DMAX_PARALLEL_NUMBER=$(NUM_PARALLEL)
+
 ifdef USE_SIMPLE_THREADED_LEVEL3
 CCOMMON_OPT    += -DUSE_SIMPLE_THREADED_LEVEL3
 endif
index 3fdd939..6458956 100644 (file)
@@ -96,6 +96,10 @@ if (NOT CMAKE_CROSSCOMPILING)
 
 endif()
 
+if (NOT DEFINED NUM_PARALLEL)
+  set(NUM_PARALLEL 1)
+endif()
+
 if (NOT DEFINED NUM_THREADS)
   if (DEFINED NUM_CORES AND NOT NUM_CORES EQUAL 0)
     # HT?
@@ -224,6 +228,8 @@ endif ()
 
 set(CCOMMON_OPT "${CCOMMON_OPT} -DMAX_CPU_NUMBER=${NUM_THREADS}")
 
+set(CCOMMON_OPT "${CCOMMON_OPT} -DMAX_PARALLEL_NUMBER=${NUM_PARALLEL}")
+
 if (USE_SIMPLE_THREADED_LEVEL3)
   set(CCOMMON_OPT "${CCOMMON_OPT} -DUSE_SIMPLE_THREADED_LEVEL3")
 endif ()
index 5a599a5..86c33b2 100644 (file)
--- a/common.h
+++ b/common.h
@@ -179,7 +179,7 @@ extern "C" {
 
 #define ALLOCA_ALIGN 63UL
 
-#define NUM_BUFFERS (MAX_CPU_NUMBER * 2)
+#define NUM_BUFFERS (MAX_CPU_NUMBER * 2 * MAX_PARALLEL_NUMBER)
 
 #ifdef NEEDBUNDERSCORE
 #define BLASFUNC(FUNC) FUNC##_
index 8d62a81..868db3b 100644 (file)
 /* or implied, of The University of Texas at Austin.                 */
 /*********************************************************************/
 
+#if _STDC_VERSION__ >= 201112L
+#ifndef _Atomic
+#define _Atomic volatile
+#endif
+#include <stdatomic.h>
+#endif
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
 //#include <sys/mman.h>
 
 int blas_server_avail = 0;
 
-static void * blas_thread_buffer[MAX_CPU_NUMBER];
+static void * blas_thread_buffer[MAX_PARALLEL_NUMBER][MAX_CPU_NUMBER];
+#if _STDC_VERSION__ >= 201112L
+static atomic_bool blas_buffer_inuse[MAX_PARALLEL_NUMBER];
+#else
+static _Bool blas_buffer_inuse[MAX_PARALLEL_NUMBER];
+#endif
 
 void goto_set_num_threads(int num_threads) {
 
-  int i=0;
+  int i=0, j=0;
 
   if (num_threads < 1) num_threads = blas_num_threads;
 
@@ -68,15 +80,17 @@ void goto_set_num_threads(int num_threads) {
   omp_set_num_threads(blas_cpu_number);
 
   //adjust buffer for each thread
-  for(i=0; i<blas_cpu_number; i++){
-    if(blas_thread_buffer[i]==NULL){
-      blas_thread_buffer[i]=blas_memory_alloc(2);
+  for(i=0; i<MAX_PARALLEL_NUMBER; i++) {
+    for(j=0; j<blas_cpu_number; j++){
+      if(blas_thread_buffer[i][j]==NULL){
+        blas_thread_buffer[i][j]=blas_memory_alloc(2);
+      }
     }
-  }
-  for(; i<MAX_CPU_NUMBER; i++){
-    if(blas_thread_buffer[i]!=NULL){
-      blas_memory_free(blas_thread_buffer[i]);
-      blas_thread_buffer[i]=NULL;
+    for(; j<MAX_CPU_NUMBER; j++){
+      if(blas_thread_buffer[i][j]!=NULL){
+        blas_memory_free(blas_thread_buffer[i][j]);
+        blas_thread_buffer[i][j]=NULL;
+      }
     }
   }
 #if defined(ARCH_MIPS64)
@@ -92,30 +106,34 @@ void openblas_set_num_threads(int num_threads) {
 
 int blas_thread_init(void){
 
-  int i=0;
+  int i=0, j=0;
 
   blas_get_cpu_number();
 
   blas_server_avail = 1;
 
-  for(i=0; i<blas_num_threads; i++){
-    blas_thread_buffer[i]=blas_memory_alloc(2);
-  }
-  for(; i<MAX_CPU_NUMBER; i++){
-      blas_thread_buffer[i]=NULL;
+  for(i=0; i<MAX_PARALLEL_NUMBER; i++) {
+    for(j=0; j<blas_num_threads; j++){
+      blas_thread_buffer[i][j]=blas_memory_alloc(2);
+    }
+    for(; j<MAX_CPU_NUMBER; j++){
+      blas_thread_buffer[i][j]=NULL;
+    }
   }
 
   return 0;
 }
 
 int BLASFUNC(blas_thread_shutdown)(void){
-  int i=0;
+  int i=0, j=0;
   blas_server_avail = 0;
 
-  for(i=0; i<MAX_CPU_NUMBER; i++){
-    if(blas_thread_buffer[i]!=NULL){
-      blas_memory_free(blas_thread_buffer[i]);
-      blas_thread_buffer[i]=NULL;
+  for(i=0; i<MAX_PARALLEL_NUMBER; i++) {
+    for(j=0; j<MAX_CPU_NUMBER; j++){
+      if(blas_thread_buffer[i][j]!=NULL){
+        blas_memory_free(blas_thread_buffer[i][j]);
+        blas_thread_buffer[i][j]=NULL;
+      }
     }
   }
 
@@ -206,7 +224,7 @@ static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
       }
 }
 
-static void exec_threads(blas_queue_t *queue){
+static void exec_threads(blas_queue_t *queue, int buf_index){
 
   void *buffer, *sa, *sb;
   int pos=0, release_flag=0;
@@ -223,7 +241,7 @@ static void exec_threads(blas_queue_t *queue){
   if ((sa == NULL) && (sb == NULL) && ((queue -> mode & BLAS_PTHREAD) == 0)) {
 
     pos = omp_get_thread_num();
-    buffer = blas_thread_buffer[pos];
+    buffer = blas_thread_buffer[buf_index][pos];
 
     //fallback
     if(buffer==NULL) {
@@ -291,7 +309,7 @@ static void exec_threads(blas_queue_t *queue){
 
 int exec_blas(BLASLONG num, blas_queue_t *queue){
 
-  BLASLONG i;
+  BLASLONG i, buf_index;
 
   if ((num <= 0) || (queue == NULL)) return 0;
 
@@ -302,6 +320,23 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
   }
 #endif
 
+  while(true) {
+    for(i=0; i < MAX_PARALLEL_NUMBER; i++) {
+#if _STDC_VERSION__ >= 201112L
+      _Bool inuse = false;
+      if(atomic_compare_exchange_weak(&blas_buffer_inuse[i], &inuse, true)) {
+#else
+      if(blas_buffer_inuse[i] == false) {
+        blas_buffer_inuse[i] = true;
+#endif
+        buf_index = i;
+        break;
+      }
+    }
+    if(i != MAX_PARALLEL_NUMBER)
+      break;
+  }
+
 #pragma omp parallel for schedule(static)
   for (i = 0; i < num; i ++) {
 
@@ -309,9 +344,15 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
     queue[i].position = i;
 #endif
 
-    exec_threads(&queue[i]);
+    exec_threads(&queue[i], buf_index);
   }
 
+#if _STDC_VERSION__ >= 201112L
+  atomic_store(&blas_buffer_inuse[buf_index], false);
+#else
+  blas_buffer_inuse[buf_index] = false;
+#endif
+
   return 0;
 }