Update snapshot(2017-12-06)
[platform/upstream/iotivity.git] / resource / csdk / connectivity / common / src / cathreadpool_pthreads.c
index d17af9c..7bb19dd 100644 (file)
 #ifndef _GNU_SOURCE
 #define _GNU_SOURCE
 #endif
+#include "iotivity_config.h"
 #include <errno.h>
-#if defined HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
 #if defined HAVE_WINSOCK2_H
 #include <winsock2.h>
 #endif
@@ -38,7 +36,8 @@
 #include "logger.h"
 #include "oic_malloc.h"
 #include "uarraylist.h"
-#include "camutex.h"
+#include "octhread.h"
+#include "ocrandom.h"
 #include "platform_features.h"
 
 #define TAG PCF("UTHREADPOOL")
@@ -50,7 +49,7 @@
 typedef struct ca_thread_pool_details_t
 {
     u_arraylist_t* threads_list;
-    ca_mutex list_lock;
+    oc_mutex list_lock;
 } ca_thread_pool_details_t;
 
 /**
@@ -63,6 +62,12 @@ typedef struct ca_thread_pool_callback_info_t
     void* data;
 } ca_thread_pool_callback_info_t;
 
+typedef struct ca_thread_pool_thread_info_t
+{
+    oc_thread thread;
+    uint32_t taskId;
+} ca_thread_pool_thread_info_t;
+
 // passthrough function to convert the pthreads call to a u_thread_func call
 void* ca_thread_pool_pthreads_delegate(void* data)
 {
@@ -110,7 +115,7 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_
         return CA_MEMORY_ALLOC_FAILED;
     }
 
-    (*thread_pool)->details->list_lock = ca_mutex_new();
+    (*thread_pool)->details->list_lock = oc_mutex_new();
 
     if(!(*thread_pool)->details->list_lock)
     {
@@ -123,7 +128,7 @@ CAResult_t ca_thread_pool_init(int32_t num_of_threads, ca_thread_pool_t *thread_
     if(!(*thread_pool)->details->threads_list)
     {
         OIC_LOG(ERROR, TAG, "Failed to create thread-pool list");
-        if(!ca_mutex_free((*thread_pool)->details->list_lock))
+        if(!oc_mutex_free((*thread_pool)->details->list_lock))
         {
             OIC_LOG(ERROR, TAG, "Failed to free thread-pool mutex");
         }
@@ -140,10 +145,15 @@ exit:
     return CA_STATUS_FAILED;
 }
 
-CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method,
-                                    void *data)
+#ifndef __TIZENRT__
+CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, void *data,
+                                   uint32_t *taskId)
+#else
+CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func method, void *data,
+                                   uint32_t *taskId, const char *task_name, int stack_size)
+#endif
 {
-    OIC_LOG(DEBUG, TAG, "IN");
+    OIC_LOG_V(DEBUG, TAG, "In %s", __func__);
 
     if(NULL == thread_pool || NULL == method)
     {
@@ -161,67 +171,136 @@ CAResult_t ca_thread_pool_add_task(ca_thread_pool_t thread_pool, ca_thread_func
     info->func = method;
     info->data = data;
 
-    pthread_t threadHandle;
-    int result = pthread_create(&threadHandle, NULL, ca_thread_pool_pthreads_delegate, info);
+    ca_thread_pool_thread_info_t *threadInfo =
+            (ca_thread_pool_thread_info_t *) OICCalloc(1, sizeof(ca_thread_pool_thread_info_t));
+    if (!threadInfo)
+    {
+        OIC_LOG(ERROR, TAG, "Memory allocation failed");
+        OICFree(info);
+        return CA_STATUS_FAILED;
+    }
+    threadInfo->taskId = OCGetRandom();
+    if (taskId)
+    {
+        *taskId = threadInfo->taskId;
+    }
 
-    if(result != 0)
+    oc_mutex_lock(thread_pool->details->list_lock);
+    bool addResult = u_arraylist_add(thread_pool->details->threads_list, (void*) threadInfo);
+    if (!addResult)
     {
-        OIC_LOG_V(ERROR, TAG, "Thread start failed with error %d", result);
+        // Note that this is considered non-fatal.
+        oc_mutex_unlock(thread_pool->details->list_lock);
+        OIC_LOG(ERROR, TAG, "Arraylist add failed");
+        OICFree(info);
+        OICFree(threadInfo);
+        return CA_STATUS_FAILED;
+    }
+
+#ifndef __TIZENRT__
+    int thrRet = oc_thread_new(&threadInfo->thread, ca_thread_pool_pthreads_delegate, info);
+#else
+    int thrRet = oc_thread_new(&threadInfo->thread, ca_thread_pool_pthreads_delegate, info,
+                               task_name, stack_size);
+#endif
+    if (thrRet != 0)
+    {
+        uint32_t index = 0;
+        if (u_arraylist_get_index(thread_pool->details->threads_list, threadInfo, &index))
+        {
+            u_arraylist_remove(thread_pool->details->threads_list, index);
+        }
+        oc_mutex_unlock(thread_pool->details->list_lock);
+        OIC_LOG_V(ERROR, TAG, "Thread start failed with error %d", thrRet);
+        OICFree(info);
         return CA_STATUS_FAILED;
     }
+    OIC_LOG_V(INFO, TAG, "created thread: %p, taskId: %u", threadInfo->thread, threadInfo->taskId);
+    oc_mutex_unlock(thread_pool->details->list_lock);
 
-    ca_mutex_lock(thread_pool->details->list_lock);
-    bool addResult = u_arraylist_add(thread_pool->details->threads_list, (void*)threadHandle);
-    ca_mutex_unlock(thread_pool->details->list_lock);
+    OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
+    return CA_STATUS_OK;
+}
 
-    if(!addResult)
+CAResult_t ca_thread_pool_remove_task(ca_thread_pool_t thread_pool, uint32_t taskId)
+{
+    OIC_LOG_V(DEBUG, TAG, "In %s", __func__);
+
+    if (!thread_pool)
     {
-        OIC_LOG_V(ERROR, TAG, "Arraylist Add failed, may not be properly joined: %d", addResult);
+        OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL");
         return CA_STATUS_FAILED;
     }
 
-    OIC_LOG(DEBUG, TAG, "OUT");
+    oc_mutex_lock(thread_pool->details->list_lock);
+    for (uint32_t i = 0; i < u_arraylist_length(thread_pool->details->threads_list); ++i)
+    {
+        ca_thread_pool_thread_info_t *threadInfo = (ca_thread_pool_thread_info_t *)
+                u_arraylist_get(thread_pool->details->threads_list, i);
+        if (threadInfo)
+        {
+            if (threadInfo->taskId == taskId)
+            {
+                OIC_LOG_V(INFO, TAG, "waiting.. thread: %p, taskId: %u", threadInfo->thread,
+                          threadInfo->taskId);
+                oc_thread_wait(threadInfo->thread);
+
+                OIC_LOG_V(INFO, TAG, "removed.. thread: %p, taskId: %u", threadInfo->thread,
+                          threadInfo->taskId);
+                u_arraylist_remove(thread_pool->details->threads_list, i);
+                oc_thread_free(threadInfo->thread);
+                OICFree(threadInfo);
+                break;
+            }
+        }
+    }
+    oc_mutex_unlock(thread_pool->details->list_lock);
+
+    OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
     return CA_STATUS_OK;
 }
 
 void ca_thread_pool_free(ca_thread_pool_t thread_pool)
 {
-    OIC_LOG(DEBUG, TAG, "IN");
+    OIC_LOG_V(DEBUG, TAG, "In %s", __func__);
 
-    if(!thread_pool)
+    if (!thread_pool)
     {
         OIC_LOG(ERROR, TAG, "Invalid parameter thread_pool was NULL");
         return;
     }
 
-    ca_mutex_lock(thread_pool->details->list_lock);
+    oc_mutex_lock(thread_pool->details->list_lock);
 
-    for(uint32_t i = 0; i<u_arraylist_length(thread_pool->details->threads_list); ++i)
+    for (uint32_t i = 0; i < u_arraylist_length(thread_pool->details->threads_list); ++i)
     {
-        pthread_t tid = (pthread_t)u_arraylist_get(thread_pool->details->threads_list, i);
-#if defined(_WIN32)
-        DWORD joinres = WaitForSingleObject(tid, INFINITE);
-        if (WAIT_OBJECT_0 != joinres)
+        ca_thread_pool_thread_info_t *threadInfo = (ca_thread_pool_thread_info_t *)
+                u_arraylist_get(thread_pool->details->threads_list, i);
+        if (threadInfo)
         {
-            OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres);
-        }
-        CloseHandle(tid);
-#else
-        int joinres = pthread_join(tid, NULL);
-        if(0 != joinres)
-        {
-            OIC_LOG_V(ERROR, TAG, "Failed to join thread at index %u with error %d", i, joinres);
-        }
+            if (threadInfo->thread)
+            {
+#ifdef __TIZEN__
+                OIC_LOG_V(INFO, TAG, "canceling.. thread: %p, taskId: %u", threadInfo->thread,
+                          threadInfo->taskId);
+                oc_thread_cancel(threadInfo->thread);
 #endif
+                OIC_LOG_V(INFO, TAG, "waiting.. thread: %p, taskId: %u", threadInfo->thread,
+                          threadInfo->taskId);
+                oc_thread_wait(threadInfo->thread);
+                oc_thread_free(threadInfo->thread);
+            }
+            OICFree(threadInfo);
+        }
     }
 
     u_arraylist_free(&(thread_pool->details->threads_list));
 
-    ca_mutex_unlock(thread_pool->details->list_lock);
-    ca_mutex_free(thread_pool->details->list_lock);
+    oc_mutex_unlock(thread_pool->details->list_lock);
+    oc_mutex_free(thread_pool->details->list_lock);
 
     OICFree(thread_pool->details);
     OICFree(thread_pool);
 
-    OIC_LOG(DEBUG, TAG, "OUT");
+    OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
 }