X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fcsdk%2Fconnectivity%2Fcommon%2Fsrc%2Fcathreadpool_pthreads.c;h=7bb19dd54bffe54e683a83f370787a5bb98d8584;hb=d2d11fd812a38648d4c797eb7e5c872ba5961d41;hp=d17af9c3ec69b399c5d6875ccdd51909f229a2f2;hpb=1f38dc188968757d7eec20816b7964b052fe5a32;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c b/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c index d17af9c..7bb19dd 100644 --- a/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c +++ b/resource/csdk/connectivity/common/src/cathreadpool_pthreads.c @@ -27,10 +27,8 @@ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif +#include "iotivity_config.h" #include -#if defined HAVE_PTHREAD_H -#include -#endif #if defined HAVE_WINSOCK2_H #include #endif @@ -38,7 +36,8 @@ #include "logger.h" #include "oic_malloc.h" #include "uarraylist.h" -#include "camutex.h" +#include "octhread.h" +#include "ocrandom.h" #include "platform_features.h" #define TAG PCF("UTHREADPOOL") @@ -50,7 +49,7 @@ typedef struct ca_thread_pool_details_t { u_arraylist_t* threads_list; - ca_mutex list_lock; + oc_mutex list_lock; } ca_thread_pool_details_t; /** @@ -63,6 +62,12 @@ typedef struct ca_thread_pool_callback_info_t void* data; } ca_thread_pool_callback_info_t; +typedef struct ca_thread_pool_thread_info_t +{ + oc_thread thread; + uint32_t taskId; +} ca_thread_pool_thread_info_t; + // passthrough function to convert the pthreads call to a u_thread_func call void* ca_thread_pool_pthreads_delegate(void* data) { @@ -110,7 +115,7 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_ return CA_MEMORY_ALLOC_FAILED; } - (*thread_pool)->details->list_lock = ca_mutex_new(); + (*thread_pool)->details->list_lock = oc_mutex_new(); if(!(*thread_pool)->details->list_lock) { @@ -123,7 +128,7 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_ if(!(*thread_pool)->details->threads_list) { OIC_LOG(ERROR, TAG, "Failed to create thread-pool list"); - if(!ca_mutex_free((*thread_pool)->details->list_lock)) + if(!oc_mutex_free((*thread_pool)->details->list_lock)) { OIC_LOG(ERROR, TAG, "Failed to free thread-pool mutex"); } @@ -140,10 +145,15 @@ exit: return CA_STATUS_FAILED; } -CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, - void *data) +#ifndef __TIZENRT__ +CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, void *data, + uint32_t *taskId) +#else +CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, void *data, + uint32_t *taskId, const char *task_name, int stack_size) +#endif { - OIC_LOG(DEBUG, TAG, "IN"); + OIC_LOG_V(DEBUG, TAG, "In %s", __func__); if(NULL == thread_pool || NULL == method) { @@ -161,67 +171,136 @@ CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func info->func = method; info->data = data; - pthread_t threadHandle; - int result = pthread_create(&threadHandle, NULL, ca_thread_pool_pthreads_delegate, info); + ca_thread_pool_thread_info_t *threadInfo = + (ca_thread_pool_thread_info_t *) OICCalloc(1, sizeof(ca_thread_pool_thread_info_t)); + if (!threadInfo) + { + OIC_LOG(ERROR, TAG, "Memory allocation failed"); + OICFree(info); + return CA_STATUS_FAILED; + } + threadInfo->taskId = OCGetRandom(); + if (taskId) + { + *taskId = threadInfo->taskId; + } - if(result != 0) + oc_mutex_lock(thread_pool->details->list_lock); + bool addResult = u_arraylist_add(thread_pool->details->threads_list, (void*) threadInfo); + if (!addResult) { - OIC_LOG_V(ERROR, TAG, "Thread start failed with error %d", result); + // Note that this is considered non-fatal. + oc_mutex_unlock(thread_pool->details->list_lock); + OIC_LOG(ERROR, TAG, "Arraylist add failed"); + OICFree(info); + OICFree(threadInfo); + return CA_STATUS_FAILED; + } + +#ifndef __TIZENRT__ + int thrRet = oc_thread_new(&threadInfo->thread, ca_thread_pool_pthreads_delegate, info); +#else + int thrRet = oc_thread_new(&threadInfo->thread, ca_thread_pool_pthreads_delegate, info, + task_name, stack_size); +#endif + if (thrRet != 0) + { + uint32_t index = 0; + if (u_arraylist_get_index(thread_pool->details->threads_list, threadInfo, &index)) + { + u_arraylist_remove(thread_pool->details->threads_list, index); + } + oc_mutex_unlock(thread_pool->details->list_lock); + OIC_LOG_V(ERROR, TAG, "Thread start failed with error %d", thrRet); + OICFree(info); return CA_STATUS_FAILED; } + OIC_LOG_V(INFO, TAG, "created thread: %p, taskId: %u", threadInfo->thread, threadInfo->taskId); + oc_mutex_unlock(thread_pool->details->list_lock); - ca_mutex_lock(thread_pool->details->list_lock); - bool addResult = u_arraylist_add(thread_pool->details->threads_list, (void*)threadHandle); - ca_mutex_unlock(thread_pool->details->list_lock); + OIC_LOG_V(DEBUG, TAG, "Out %s", __func__); + return CA_STATUS_OK; +} - if(!addResult) +CAResult_t ca_thread_pool_remove_task(ca_thread_pool_t thread_pool, uint32_t taskId) +{ + OIC_LOG_V(DEBUG, TAG, "In %s", __func__); + + if (!thread_pool) { - OIC_LOG_V(ERROR, TAG, "Arraylist Add failed, may not be properly joined: %d", addResult); + OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL"); return CA_STATUS_FAILED; } - OIC_LOG(DEBUG, TAG, "OUT"); + oc_mutex_lock(thread_pool->details->list_lock); + for (uint32_t i = 0; i < u_arraylist_length(thread_pool->details->threads_list); ++i) + { + ca_thread_pool_thread_info_t *threadInfo = (ca_thread_pool_thread_info_t *) + u_arraylist_get(thread_pool->details->threads_list, i); + if (threadInfo) + { + if (threadInfo->taskId == taskId) + { + OIC_LOG_V(INFO, TAG, "waiting.. thread: %p, taskId: %u", threadInfo->thread, + threadInfo->taskId); + oc_thread_wait(threadInfo->thread); + + OIC_LOG_V(INFO, TAG, "removed.. thread: %p, taskId: %u", threadInfo->thread, + threadInfo->taskId); + u_arraylist_remove(thread_pool->details->threads_list, i); + oc_thread_free(threadInfo->thread); + OICFree(threadInfo); + break; + } + } + } + oc_mutex_unlock(thread_pool->details->list_lock); + + OIC_LOG_V(DEBUG, TAG, "Out %s", __func__); return CA_STATUS_OK; } void ca_thread_pool_free(ca_thread_pool_t thread_pool) { - OIC_LOG(DEBUG, TAG, "IN"); + OIC_LOG_V(DEBUG, TAG, "In %s", __func__); - if(!thread_pool) + if (!thread_pool) { OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL"); return; } - ca_mutex_lock(thread_pool->details->list_lock); + oc_mutex_lock(thread_pool->details->list_lock); - for(uint32_t i = 0; idetails->threads_list); ++i) + for (uint32_t i = 0; i < u_arraylist_length(thread_pool->details->threads_list); ++i) { - pthread_t tid = (pthread_t)u_arraylist_get(thread_pool->details->threads_list, i); -#if defined(_WIN32) - DWORD joinres = WaitForSingleObject(tid, INFINITE); - if (WAIT_OBJECT_0 != joinres) + ca_thread_pool_thread_info_t *threadInfo = (ca_thread_pool_thread_info_t *) + u_arraylist_get(thread_pool->details->threads_list, i); + if (threadInfo) { - OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres); - } - CloseHandle(tid); -#else - int joinres = pthread_join(tid, NULL); - if(0 != joinres) - { - OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres); - } + if (threadInfo->thread) + { +#ifdef __TIZEN__ + OIC_LOG_V(INFO, TAG, "canceling.. thread: %p, taskId: %u", threadInfo->thread, + threadInfo->taskId); + oc_thread_cancel(threadInfo->thread); #endif + OIC_LOG_V(INFO, TAG, "waiting.. thread: %p, taskId: %u", threadInfo->thread, + threadInfo->taskId); + oc_thread_wait(threadInfo->thread); + oc_thread_free(threadInfo->thread); + } + OICFree(threadInfo); + } } u_arraylist_free(&(thread_pool->details->threads_list)); - ca_mutex_unlock(thread_pool->details->list_lock); - ca_mutex_free(thread_pool->details->list_lock); + oc_mutex_unlock(thread_pool->details->list_lock); + oc_mutex_free(thread_pool->details->list_lock); OICFree(thread_pool->details); OICFree(thread_pool); - OIC_LOG(DEBUG, TAG, "OUT"); + OIC_LOG_V(DEBUG, TAG, "Out %s", __func__); }