Revert "Avoid adding elements to QueueingThread if it's already stopped."
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
index 18da00d..0afeed9 100644 (file)
  *
  ******************************************************************/
 
+#ifdef __TIZENRT__
+#include <tinyara/config.h>
+#endif
+
+#include "iotivity_config.h"
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#ifdef HAVE_UNISTD_H
 #include <unistd.h>
-#include <sys/syscall.h>
+#endif
+#ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
+#endif
 
 #include "caqueueingthread.h"
+#include "camessagehandler.h"
 #include "oic_malloc.h"
 #include "logger.h"
 
-#define TAG PCF("CA")
+#define TAG PCF("OIC_CA_QING")
 
 static void CAQueueingThreadBaseRoutine(void *threadValue)
 {
-    OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
+    OIC_LOG(DEBUG, TAG, "message handler main thread start..");
 
     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
 
-    if (thread == NULL)
+    if (NULL == thread)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
-
+        OIC_LOG(ERROR, TAG, "thread data passing error!!");
         return;
     }
 
     while (!thread->isStop)
     {
         // mutex lock
-        u_mutex_lock(thread->threadMutex);
+        oc_mutex_lock(thread->threadMutex);
 
         // if queue is empty, thread will wait
-        if (u_queue_get_size(thread->dataQueue) <= 0)
+        if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
         {
-            OIC_LOG_V(DEBUG, TAG, "wait..");
-            
+            OIC_LOG(DEBUG, TAG, "wait..");
+
             // wait
-            u_cond_wait(thread->threadCond, thread->threadMutex);
+            oc_cond_wait(thread->threadCond, thread->threadMutex);
 
-            OIC_LOG_V(DEBUG, TAG, "wake up..");
+            OIC_LOG(DEBUG, TAG, "wake up..");
         }
 
-        // mutex unlock
-        u_mutex_unlock(thread->threadMutex);
-
         // check stop flag
         if (thread->isStop)
+        {
+            // mutex unlock
+            oc_mutex_unlock(thread->threadMutex);
             continue;
+        }
 
         // get data
         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
-
-        void *data = message->msg;
+        // mutex unlock
+        oc_mutex_unlock(thread->threadMutex);
+        if (NULL == message)
+        {
+            continue;
+        }
 
         // process data
-        thread->threadTask(data);
+        thread->threadTask(message->msg);
 
         // free
+        if (NULL != thread->destroy)
+        {
+            thread->destroy(message->msg, message->size);
+        }
+        else
+        {
+            OICFree(message->msg);
+        }
+
+        OICFree(message);
     }
 
-    u_cond_signal(thread->threadCond);
+    oc_mutex_lock(thread->threadMutex);
+    oc_cond_signal(thread->threadCond);
+    oc_mutex_unlock(thread->threadMutex);
 
-    OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
+    OIC_LOG(DEBUG, TAG, "message handler main thread end..");
 }
 
-CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
-                                      CAThreadTask task)
+CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
+                                      CAThreadTask task, CADataDestroyFunction destroy)
 {
-    if (thread == NULL)
+    if (NULL == thread)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
-    if (handle == NULL)
+    if (NULL == handle)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
-    OIC_LOG_V(DEBUG, TAG, "thread initialize..");
-
-    memset(thread, 0, sizeof(CAQueueingThread_t));
-
-    // mutex init
-    u_mutex_init();
+    OIC_LOG(DEBUG, TAG, "thread initialize..");
 
     // set send thread data
     thread->threadPool = handle;
     thread->dataQueue = u_queue_create();
-    thread->threadMutex = u_mutex_new();
-    thread->threadCond = u_cond_new();
-    thread->isStop = CA_TRUE;
+    thread->threadMutex = oc_mutex_new();
+    thread->threadCond = oc_cond_new();
+    thread->isStop = true;
     thread->threadTask = task;
+    thread->destroy = destroy;
+    if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
+    {
+        goto ERROR_MEM_FAILURE;
+    }
 
     return CA_STATUS_OK;
-}
 
+ERROR_MEM_FAILURE:
+    if (thread->dataQueue)
+    {
+        u_queue_delete(thread->dataQueue);
+        thread->dataQueue = NULL;
+    }
+    if (thread->threadMutex)
+    {
+        oc_mutex_free(thread->threadMutex);
+        thread->threadMutex = NULL;
+    }
+    if (thread->threadCond)
+    {
+        oc_cond_free(thread->threadCond);
+        thread->threadCond = NULL;
+    }
+    return CA_MEMORY_ALLOC_FAILED;
+}
+#ifndef __TIZENRT__
 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
+#else
+CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
+#endif
 {
-    if (thread == NULL)
+    if (NULL == thread)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
-    if (thread->threadPool == NULL)
+    if (NULL == thread->threadPool)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
-    CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
-                                            thread);
+    if (false == thread->isStop) //Queueing thread already running
+    {
+        OIC_LOG(DEBUG, TAG, "queueing thread already running..");
+        return CA_STATUS_OK;
+    }
 
+    // mutex lock
+    oc_mutex_lock(thread->threadMutex);
+    thread->isStop = false;
+    // mutex unlock
+    oc_mutex_unlock(thread->threadMutex);
+#ifndef __TIZENRT__
+    CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
+                                             thread, NULL);
+#else
+    CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
+                                             thread, NULL, thread_name,
+                                             CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
+#endif
     if (res != CA_STATUS_OK)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread).");
-        return res;
-    }
+        // update thread status.
+        oc_mutex_lock(thread->threadMutex);
+        thread->isStop = true;
+        oc_mutex_unlock(thread->threadMutex);
 
-    thread->isStop = CA_FALSE;
+        OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
+    }
 
     return res;
 }
 
 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
 {
-    if (thread == NULL)
+    if (NULL == thread)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
-    if (data == NULL || size == 0)
+    if (NULL == data || 0 == size)
     {
-        OIC_LOG_V(DEBUG, TAG, "data is empty..");
-
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "data is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
     // create thread data
     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
 
-    if (message == NULL)
+    if (NULL == message)
     {
-        OIC_LOG_V(DEBUG, TAG, "memory error!!");
+        OIC_LOG(ERROR, TAG, "memory error!!");
         return CA_MEMORY_ALLOC_FAILED;
     }
-    memset(message, 0, sizeof(u_queue_message_t));
 
     message->msg = data;
-    message->size = sizeof(size);
+    message->size = size;
 
     // mutex lock
-    u_mutex_lock(thread->threadMutex);
+    oc_mutex_lock(thread->threadMutex);
 
     // add thread data into list
     u_queue_add_element(thread->dataQueue, message);
 
     // notity the thread
-    u_cond_signal(thread->threadCond);
+    oc_cond_signal(thread->threadCond);
 
     // mutex unlock
-    u_mutex_unlock(thread->threadMutex);
+    oc_mutex_unlock(thread->threadMutex);
+
+    return CA_STATUS_OK;
+}
+
+CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
+{
+    if (NULL == thread)
+    {
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
+    }
+
+    OIC_LOG(DEBUG, TAG, "clear queue data..");
+
+    // mutex lock
+    oc_mutex_lock(thread->threadMutex);
+
+    // remove all remained list data.
+    while (u_queue_get_size(thread->dataQueue) > 0)
+    {
+        // get data
+        u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
+
+        // free
+        if (NULL != message)
+        {
+            if (NULL != thread->destroy)
+            {
+                thread->destroy(message->msg, message->size);
+            }
+            else
+            {
+                OICFree(message->msg);
+            }
+
+            OICFree(message);
+        }
+    }
+
+    // mutex unlock
+    oc_mutex_unlock(thread->threadMutex);
+
+    return CA_STATUS_OK;
+}
+
+CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
+                                            CAContextDataDestroy callback, void *ctx)
+{
+    if (NULL == thread)
+    {
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
+    }
+
+    if (NULL == callback)
+    {
+        OIC_LOG(ERROR, TAG, "callback is NULL..");
+        return CA_STATUS_INVALID_PARAM;
+    }
+
+    if (NULL == ctx)
+    {
+        OIC_LOG(ERROR, TAG, "ctx is NULL..");
+        return CA_STATUS_INVALID_PARAM;
+    }
+
+    OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
+
+    // mutex lock
+    oc_mutex_lock(thread->threadMutex);
+
+    // remove adapter related list data.
+    u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
+
+    // mutex unlock
+    oc_mutex_unlock(thread->threadMutex);
 
     return CA_STATUS_OK;
 }
 
 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
 {
-    if (thread == NULL)
+    if (NULL == thread)
+    {
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
+    }
+
+    OIC_LOG(DEBUG, TAG, "thread destroy..");
+
+    // mutex lock
+    oc_mutex_lock(thread->threadMutex);
+
+    // remove all remained list data.
+    while (u_queue_get_size(thread->dataQueue) > 0)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
-        return CA_STATUS_FAILED;
+        // get data
+        u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
+
+        // free
+        if (NULL != message)
+        {
+            if (NULL != thread->destroy)
+            {
+                thread->destroy(message->msg, message->size);
+            }
+            else
+            {
+                OICFree(message->msg);
+            }
+
+            OICFree(message);
+        }
     }
 
-    OIC_LOG_V(DEBUG, TAG, "thread destroy..");
+    // mutex unlock
+    oc_mutex_unlock(thread->threadMutex);
 
-    u_mutex_free(thread->threadMutex);
+    oc_mutex_free(thread->threadMutex);
     thread->threadMutex = NULL;
-    u_cond_free(thread->threadCond);
+    oc_cond_free(thread->threadCond);
+    thread->threadCond = NULL;
+
     u_queue_delete(thread->dataQueue);
+    thread->dataQueue = NULL;
 
     return CA_STATUS_OK;
 }
 
 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
 {
-    if (thread == NULL)
+    if (NULL == thread)
     {
-        OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
-        return CA_STATUS_FAILED;
+        OIC_LOG(ERROR, TAG, "thread instance is empty..");
+        return CA_STATUS_INVALID_PARAM;
     }
 
-    OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
+    OIC_LOG(DEBUG, TAG, "thread stop request!!");
 
     if (!thread->isStop)
     {
         // mutex lock
-        u_mutex_lock(thread->threadMutex);
+        oc_mutex_lock(thread->threadMutex);
 
         // set stop flag
-        thread->isStop = CA_TRUE;
+        thread->isStop = true;
 
-        // notity the thread
-        u_cond_signal(thread->threadCond);
+        // notify the thread
+        oc_cond_signal(thread->threadCond);
 
-        u_cond_wait(thread->threadCond, thread->threadMutex);
+        oc_cond_wait(thread->threadCond, thread->threadMutex);
 
         // mutex unlock
-        u_mutex_unlock(thread->threadMutex);
+        oc_mutex_unlock(thread->threadMutex);
     }
 
     return CA_STATUS_OK;
 }
-