From 51fe042f56a6606705d26447ef21fbf39cf86426 Mon Sep 17 00:00:00 2001 From: Erich Keane Date: Fri, 24 Apr 2015 11:23:21 -0700 Subject: [PATCH] Implementation of threadpool with very simple join-at-end Feedback was received that the 'dumb' threadpool implementation was not sufficient. This fix ensures that all threadpool items are stored and joined at the end. Change-Id: I47d6af18723c994efd072c08ed6766b07a212db9 Signed-off-by: Erich Keane Reviewed-on: https://gerrit.iotivity.org/gerrit/832 Tested-by: jenkins-iotivity Reviewed-by: Ashok Babu Channa Reviewed-by: Joseph Morrow --- .../common/src/cathreadpool_pthreads.c | 85 ++++++++++++++++++++-- resource/csdk/connectivity/test/camutex_tests.cpp | 8 -- 2 files changed, 77 insertions(+), 16 deletions(-) diff --git a/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c b/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c index 2bdafab..d06b0a1 100644 --- a/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c +++ b/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c @@ -24,10 +24,14 @@ * This file provides APIs related to thread pool. */ +#define _GNU_SOURCE +#include #include #include "cathreadpool.h" #include "logger.h" #include "oic_malloc.h" +#include "uarraylist.h" +#include "camutex.h" #define TAG PCF("UTHREADPOOL") @@ -37,6 +41,8 @@ */ typedef struct ca_thread_pool_details_t { + u_arraylist_t* threads_list; + ca_mutex list_lock; } ca_thread_pool_details_t; /** @@ -69,7 +75,7 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_ if(!thread_pool) { - OIC_LOG(ERROR, TAG, "Parameter thraed_pool was null!"); + OIC_LOG(ERROR, TAG, "Parameter thread_pool was null!"); return CA_STATUS_INVALID_PARAM; } @@ -87,6 +93,42 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_ return CA_MEMORY_ALLOC_FAILED; } + (*thread_pool)->details = OICMalloc(sizeof(struct ca_thread_pool_details_t)); + if(!(*thread_pool)->details) + { + OIC_LOG(ERROR, TAG, "Failed to allocate for thread-pool details"); + OICFree(*thread_pool); + *thread_pool=NULL; + return CA_MEMORY_ALLOC_FAILED; + } + + (*thread_pool)->details->list_lock = ca_mutex_new(); + + if(!(*thread_pool)->details->list_lock) + { + OIC_LOG(ERROR, TAG, "Failed to create thread-pool mutex"); + OICFree((*thread_pool)->details); + OICFree(*thread_pool); + *thread_pool = NULL; + return CA_STATUS_FAILED; + } + + (*thread_pool)->details->threads_list = u_arraylist_create(); + + if(!(*thread_pool)->details->threads_list) + { + OIC_LOG(ERROR, TAG, "Failed to create thread-pool list"); + if(!ca_mutex_free((*thread_pool)->details->list_lock)) + { + OIC_LOG(ERROR, TAG, "Failed to free thread-pool mutex"); + } + + OICFree((*thread_pool)->details); + OICFree(*thread_pool); + *thread_pool = NULL; + return CA_STATUS_FAILED; + } + OIC_LOG(DEBUG, TAG, "OUT"); return CA_STATUS_OK; } @@ -122,15 +164,14 @@ CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func return CA_STATUS_FAILED; } - // detach will cause the thread to either terminate normally and clean up after - // itself, which prevents us from having to do any manual join/cleanup later, or - // it will be terminated upon application exit. - result = pthread_detach(threadHandle); + ca_mutex_lock(thread_pool->details->list_lock); + CAResult_t addResult = u_arraylist_add(thread_pool->details->threads_list, (void*)threadHandle); + ca_mutex_unlock(thread_pool->details->list_lock); - if(result != 0) + if(addResult != CA_STATUS_OK) { - OIC_LOG_V(ERROR, TAG, "Thread detach failed with error %d", result); - return CA_STATUS_FAILED; + OIC_LOG_V(ERROR, TAG, "Arraylist Add failed, may not be properly joined: %d", addResult); + return addResult; } OIC_LOG(DEBUG, TAG, "OUT"); @@ -141,6 +182,34 @@ void ca_thread_pool_free(ca_thread_pool_t thread_pool) { OIC_LOG(DEBUG, TAG, "IN"); + if(!thread_pool) + { + OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL"); + } + + ca_mutex_lock(thread_pool->details->list_lock); + + for(uint32_t i = 0; idetails->threads_list); ++i) + { + pthread_t tid = (pthread_t)u_arraylist_get(thread_pool->details->threads_list, i); + int joinres = pthread_join(tid, NULL); + if(0 != joinres) + { + OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres); + } + } + + CAResult_t freeres = u_arraylist_free(&(thread_pool->details->threads_list)); + if(CA_STATUS_OK != freeres) + { + OIC_LOG_V(ERROR, TAG, "Failed to free array list, error was: %d", freeres); + } + + ca_mutex_unlock(thread_pool->details->list_lock); + ca_mutex_free(thread_pool->details->list_lock); + + OICFree(thread_pool->details); OICFree(thread_pool); + OIC_LOG(DEBUG, TAG, "OUT"); } diff --git a/resource/csdk/connectivity/test/camutex_tests.cpp b/resource/csdk/connectivity/test/camutex_tests.cpp index f589784..98def7e 100644 --- a/resource/csdk/connectivity/test/camutex_tests.cpp +++ b/resource/csdk/connectivity/test/camutex_tests.cpp @@ -258,10 +258,6 @@ TEST(ConditionTests, TC_02_SIGNAL) EXPECT_EQ(CA_STATUS_OK, ca_thread_pool_add_task(mythreadpool, condFunc, &pData2)); - //start thread - EXPECT_EQ(CA_STATUS_OK, - ca_thread_pool_add_task(mythreadpool, condFunc, &pData2)); - DBG_printf("test : sleeping\n"); while (!pData1.thread_up || !pData2.thread_up) @@ -359,10 +355,6 @@ TEST(ConditionTests, TC_03_BROADCAST) EXPECT_EQ(CA_STATUS_OK, ca_thread_pool_add_task(mythreadpool, condFunc, &pData2)); - //start thread - EXPECT_EQ(CA_STATUS_OK, - ca_thread_pool_add_task(mythreadpool, condFunc, &pData2)); - DBG_printf("test : sleeping\n"); while (!pData1.thread_up || !pData2.thread_up) -- 2.7.4