Implementation of threadpool with very simple join-at-end
authorErich Keane <erich.keane@intel.com>
Fri, 24 Apr 2015 18:23:21 +0000 (11:23 -0700)
committerErich Keane <erich.keane@intel.com>
Thu, 30 Apr 2015 16:47:49 +0000 (16:47 +0000)
Feedback was received that the 'dumb' threadpool implementation
was not sufficient.  This fix ensures that all threadpool items are
stored and joined at the end.

Change-Id: I47d6af18723c994efd072c08ed6766b07a212db9
Signed-off-by: Erich Keane <erich.keane@intel.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/832
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Ashok Babu Channa <ashok.channa@samsung.com>
Reviewed-by: Joseph Morrow <joseph.l.morrow@intel.com>
resource/csdk/connectivity/common/src/cathreadpool_pthreads.c
resource/csdk/connectivity/test/camutex_tests.cpp

index 2bdafab..d06b0a1 100644 (file)
  * This file provides APIs related to thread pool.
  */
 
+#define _GNU_SOURCE
+#include <errno.h>
 #include <pthread.h>
 #include "cathreadpool.h"
 #include "logger.h"
 #include "oic_malloc.h"
+#include "uarraylist.h"
+#include "camutex.h"
 
 #define TAG PCF("UTHREADPOOL")
 
@@ -37,6 +41,8 @@
  */
 typedef struct ca_thread_pool_details_t
 {
+    u_arraylist_t* threads_list;
+    ca_mutex list_lock;
 } ca_thread_pool_details_t;
 
 /**
@@ -69,7 +75,7 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_
 
     if(!thread_pool)
     {
-        OIC_LOG(ERROR, TAG, "Parameter thraed_pool was null!");
+        OIC_LOG(ERROR, TAG, "Parameter thread_pool was null!");
         return CA_STATUS_INVALID_PARAM;
     }
 
@@ -87,6 +93,42 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_
         return CA_MEMORY_ALLOC_FAILED;
     }
 
+    (*thread_pool)->details = OICMalloc(sizeof(struct ca_thread_pool_details_t));
+    if(!(*thread_pool)->details)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to allocate for thread-pool details");
+        OICFree(*thread_pool);
+        *thread_pool=NULL;
+        return CA_MEMORY_ALLOC_FAILED;
+    }
+
+    (*thread_pool)->details->list_lock = ca_mutex_new();
+
+    if(!(*thread_pool)->details->list_lock)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to create thread-pool mutex");
+        OICFree((*thread_pool)->details);
+        OICFree(*thread_pool);
+        *thread_pool = NULL;
+        return CA_STATUS_FAILED;
+    }
+
+    (*thread_pool)->details->threads_list = u_arraylist_create();
+
+    if(!(*thread_pool)->details->threads_list)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to create thread-pool list");
+        if(!ca_mutex_free((*thread_pool)->details->list_lock))
+        {
+            OIC_LOG(ERROR, TAG, "Failed to free thread-pool mutex");
+        }
+
+        OICFree((*thread_pool)->details);
+        OICFree(*thread_pool);
+        *thread_pool = NULL;
+        return CA_STATUS_FAILED;
+    }
+
     OIC_LOG(DEBUG, TAG, "OUT");
     return CA_STATUS_OK;
 }
@@ -122,15 +164,14 @@ CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func
         return CA_STATUS_FAILED;
     }
 
-    // detach will cause the thread to either terminate normally and clean up after
-    // itself, which prevents us from having to do any manual join/cleanup later, or
-    // it will be terminated upon application exit.
-    result = pthread_detach(threadHandle);
+    ca_mutex_lock(thread_pool->details->list_lock);
+    CAResult_t addResult = u_arraylist_add(thread_pool->details->threads_list, (void*)threadHandle);
+    ca_mutex_unlock(thread_pool->details->list_lock);
 
-    if(result != 0)
+    if(addResult != CA_STATUS_OK)
     {
-        OIC_LOG_V(ERROR, TAG, "Thread detach failed with error %d", result);
-        return CA_STATUS_FAILED;
+        OIC_LOG_V(ERROR, TAG, "Arraylist Add failed, may not be properly joined: %d", addResult);
+        return addResult;
     }
 
     OIC_LOG(DEBUG, TAG, "OUT");
@@ -141,6 +182,34 @@ void ca_thread_pool_free(ca_thread_pool_t thread_pool)
 {
     OIC_LOG(DEBUG, TAG, "IN");
 
+    if(!thread_pool)
+    {
+        OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL");
+    }
+
+    ca_mutex_lock(thread_pool->details->list_lock);
+
+    for(uint32_t i = 0; i<u_arraylist_length(thread_pool->details->threads_list); ++i)
+    {
+        pthread_t tid = (pthread_t)u_arraylist_get(thread_pool->details->threads_list, i);
+        int joinres = pthread_join(tid, NULL);
+        if(0 != joinres)
+        {
+            OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres);
+        }
+    }
+
+    CAResult_t freeres = u_arraylist_free(&(thread_pool->details->threads_list));
+    if(CA_STATUS_OK != freeres)
+    {
+        OIC_LOG_V(ERROR, TAG, "Failed to free array list, error was: %d", freeres);
+    }
+
+    ca_mutex_unlock(thread_pool->details->list_lock);
+    ca_mutex_free(thread_pool->details->list_lock);
+
+    OICFree(thread_pool->details);
     OICFree(thread_pool);
+
     OIC_LOG(DEBUG, TAG, "OUT");
 }
index f589784..98def7e 100644 (file)
@@ -258,10 +258,6 @@ TEST(ConditionTests, TC_02_SIGNAL)
         EXPECT_EQ(CA_STATUS_OK,
                   ca_thread_pool_add_task(mythreadpool, condFunc, &pData2));
 
-        //start thread
-        EXPECT_EQ(CA_STATUS_OK,
-                  ca_thread_pool_add_task(mythreadpool, condFunc, &pData2));
-
         DBG_printf("test    : sleeping\n");
 
         while (!pData1.thread_up || !pData2.thread_up)
@@ -359,10 +355,6 @@ TEST(ConditionTests, TC_03_BROADCAST)
         EXPECT_EQ(CA_STATUS_OK,
                   ca_thread_pool_add_task(mythreadpool, condFunc, &pData2));
 
-        //start thread
-        EXPECT_EQ(CA_STATUS_OK,
-                  ca_thread_pool_add_task(mythreadpool, condFunc, &pData2));
-
         DBG_printf("test    : sleeping\n");
 
         while (!pData1.thread_up || !pData2.thread_up)