#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
+#include "iotivity_config.h"
#include <errno.h>
-#include <pthread.h>
+#if defined HAVE_WINSOCK2_H
+#include <winsock2.h>
+#endif
#include "cathreadpool.h"
#include "logger.h"
#include "oic_malloc.h"
#include "uarraylist.h"
-#include "camutex.h"
+#include "octhread.h"
+#include "ocrandom.h"
+#include "platform_features.h"
#define TAG PCF("UTHREADPOOL")
typedef struct ca_thread_pool_details_t
{
u_arraylist_t* threads_list;
- ca_mutex list_lock;
+ oc_mutex list_lock;
} ca_thread_pool_details_t;
/**
void* data;
} ca_thread_pool_callback_info_t;
+typedef struct ca_thread_pool_thread_info_t
+{
+ oc_thread thread;
+ uint32_t taskId;
+} ca_thread_pool_thread_info_t;
+
// passthrough function to convert the pthreads call to a u_thread_func call
void* ca_thread_pool_pthreads_delegate(void* data)
{
return CA_MEMORY_ALLOC_FAILED;
}
- (*thread_pool)->details->list_lock = ca_mutex_new();
+ (*thread_pool)->details->list_lock = oc_mutex_new();
if(!(*thread_pool)->details->list_lock)
{
if(!(*thread_pool)->details->threads_list)
{
OIC_LOG(ERROR, TAG, "Failed to create thread-pool list");
- if(!ca_mutex_free((*thread_pool)->details->list_lock))
+ if(!oc_mutex_free((*thread_pool)->details->list_lock))
{
OIC_LOG(ERROR, TAG, "Failed to free thread-pool mutex");
}
return CA_STATUS_FAILED;
}
-CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method,
- void *data)
+#ifndef __TIZENRT__
+CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, void *data,
+ uint32_t *taskId)
+#else
+CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, void *data,
+ uint32_t *taskId, const char *task_name, int stack_size)
+#endif
{
- OIC_LOG(DEBUG, TAG, "IN");
+ OIC_LOG_V(DEBUG, TAG, "In %s", __func__);
if(NULL == thread_pool || NULL == method)
{
info->func = method;
info->data = data;
- pthread_t threadHandle;
+ ca_thread_pool_thread_info_t *threadInfo =
+ (ca_thread_pool_thread_info_t *) OICCalloc(1, sizeof(ca_thread_pool_thread_info_t));
+ if (!threadInfo)
+ {
+ OIC_LOG(ERROR, TAG, "Memory allocation failed");
+ OICFree(info);
+ return CA_STATUS_FAILED;
+ }
+ threadInfo->taskId = OCGetRandom();
+ if (taskId)
+ {
+ *taskId = threadInfo->taskId;
+ }
- int result = pthread_create(&threadHandle, NULL, ca_thread_pool_pthreads_delegate, info);
+ oc_mutex_lock(thread_pool->details->list_lock);
+ bool addResult = u_arraylist_add(thread_pool->details->threads_list, (void*) threadInfo);
+ if (!addResult)
+ {
+ // Note that this is considered non-fatal.
+ oc_mutex_unlock(thread_pool->details->list_lock);
+ OIC_LOG(ERROR, TAG, "Arraylist add failed");
+ OICFree(info);
+ OICFree(threadInfo);
+ return CA_STATUS_FAILED;
+ }
- if(result != 0)
+#ifndef __TIZENRT__
+ int thrRet = oc_thread_new(&threadInfo->thread, ca_thread_pool_pthreads_delegate, info);
+#else
+ int thrRet = oc_thread_new(&threadInfo->thread, ca_thread_pool_pthreads_delegate, info,
+ task_name, stack_size);
+#endif
+ if (thrRet != 0)
{
- OIC_LOG_V(ERROR, TAG, "Thread start failed with error %d", result);
+ uint32_t index = 0;
+ if (u_arraylist_get_index(thread_pool->details->threads_list, threadInfo, &index))
+ {
+ u_arraylist_remove(thread_pool->details->threads_list, index);
+ }
+ oc_mutex_unlock(thread_pool->details->list_lock);
+ OIC_LOG_V(ERROR, TAG, "Thread start failed with error %d", thrRet);
+ OICFree(info);
return CA_STATUS_FAILED;
}
+ OIC_LOG_V(DEBUG, TAG, "created taskId: %u", threadInfo->taskId);
+ oc_mutex_unlock(thread_pool->details->list_lock);
- ca_mutex_lock(thread_pool->details->list_lock);
- bool addResult = u_arraylist_add(thread_pool->details->threads_list, (void*)threadHandle);
- ca_mutex_unlock(thread_pool->details->list_lock);
+ OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
+ return CA_STATUS_OK;
+}
+
+CAResult_t ca_thread_pool_remove_task(ca_thread_pool_t thread_pool, uint32_t taskId)
+{
+ OIC_LOG_V(DEBUG, TAG, "In %s", __func__);
- if(!addResult)
+ if (!thread_pool)
{
- OIC_LOG_V(ERROR, TAG, "Arraylist Add failed, may not be properly joined: %d", addResult);
+ OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL");
return CA_STATUS_FAILED;
}
- OIC_LOG(DEBUG, TAG, "OUT");
+ oc_mutex_lock(thread_pool->details->list_lock);
+ for (uint32_t i = 0; i < u_arraylist_length(thread_pool->details->threads_list); ++i)
+ {
+ ca_thread_pool_thread_info_t *threadInfo = (ca_thread_pool_thread_info_t *)
+ u_arraylist_get(thread_pool->details->threads_list, i);
+ if (threadInfo)
+ {
+ if (threadInfo->taskId == taskId)
+ {
+ OIC_LOG_V(INFO, TAG, "waiting.. taskId: %u", threadInfo->taskId);
+ oc_thread_wait(threadInfo->thread);
+
+ OIC_LOG_V(DEBUG, TAG, "removed taskId: %u", threadInfo->taskId);
+ u_arraylist_remove(thread_pool->details->threads_list, i);
+ oc_thread_free(threadInfo->thread);
+ OICFree(threadInfo);
+ break;
+ }
+ }
+ }
+ oc_mutex_unlock(thread_pool->details->list_lock);
+
+ OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
return CA_STATUS_OK;
}
void ca_thread_pool_free(ca_thread_pool_t thread_pool)
{
- OIC_LOG(DEBUG, TAG, "IN");
+ OIC_LOG_V(DEBUG, TAG, "In %s", __func__);
- if(!thread_pool)
+ if (!thread_pool)
{
OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL");
return;
}
- ca_mutex_lock(thread_pool->details->list_lock);
+ oc_mutex_lock(thread_pool->details->list_lock);
- for(uint32_t i = 0; i<u_arraylist_length(thread_pool->details->threads_list); ++i)
+ for (uint32_t i = 0; i < u_arraylist_length(thread_pool->details->threads_list); ++i)
{
- pthread_t tid = (pthread_t)u_arraylist_get(thread_pool->details->threads_list, i);
- int joinres = pthread_join(tid, NULL);
- if(0 != joinres)
+ ca_thread_pool_thread_info_t *threadInfo = (ca_thread_pool_thread_info_t *)
+ u_arraylist_get(thread_pool->details->threads_list, i);
+ if (threadInfo)
{
- OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres);
+ if (threadInfo->thread)
+ {
+#ifdef __TIZEN__
+ OIC_LOG_V(INFO, TAG, "canceling.. thread: %p", threadInfo->thread);
+ oc_thread_cancel(threadInfo->thread);
+#endif
+ OIC_LOG_V(INFO, TAG, "waiting.. thread: %p", threadInfo->thread);
+ oc_thread_wait(threadInfo->thread);
+ oc_thread_free(threadInfo->thread);
+ }
+ OICFree(threadInfo);
}
}
u_arraylist_free(&(thread_pool->details->threads_list));
- ca_mutex_unlock(thread_pool->details->list_lock);
- ca_mutex_free(thread_pool->details->list_lock);
+ oc_mutex_unlock(thread_pool->details->list_lock);
+ oc_mutex_free(thread_pool->details->list_lock);
OICFree(thread_pool->details);
OICFree(thread_pool);
- OIC_LOG(DEBUG, TAG, "OUT");
+ OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
}