Implementation of connectivity abstraction feature Release v0.5
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
index 0bd08eb..18da00d 100644 (file)
@@ -21,6 +21,9 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
 
 #include "caqueueingthread.h"
 #include "oic_malloc.h"
 
 #define TAG PCF("CA")
 
-static void CAQueueingThreadBaseRoutine(void* treadValue)
+static void CAQueueingThreadBaseRoutine(void *threadValue)
 {
     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
 
-    CAQueueingThread_t* thread = (CAQueueingThread_t*) treadValue;
+    CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
 
     if (thread == NULL)
     {
@@ -50,6 +53,7 @@ static void CAQueueingThreadBaseRoutine(void* treadValue)
         if (u_queue_get_size(thread->dataQueue) <= 0)
         {
             OIC_LOG_V(DEBUG, TAG, "wait..");
+            
             // wait
             u_cond_wait(thread->threadCond, thread->threadMutex);
 
@@ -64,9 +68,9 @@ static void CAQueueingThreadBaseRoutine(void* treadValue)
             continue;
 
         // get data
-        u_queue_message_tmessage = u_queue_get_element(thread->dataQueue);
+        u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
 
-        voiddata = message->msg;
+        void *data = message->msg;
 
         // process data
         thread->threadTask(data);
@@ -74,11 +78,13 @@ static void CAQueueingThreadBaseRoutine(void* treadValue)
         // free
     }
 
+    u_cond_signal(thread->threadCond);
+
     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
 }
 
-CAResult_t CAQueueingThreadInitialize(CAQueueingThread_tthread, u_thread_pool_t handle,
-        CAThreadTask task)
+CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
+                                      CAThreadTask task)
 {
     if (thread == NULL)
     {
@@ -104,13 +110,13 @@ CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t* thread, u_thread_pool_
     thread->dataQueue = u_queue_create();
     thread->threadMutex = u_mutex_new();
     thread->threadCond = u_cond_new();
-    thread->isStop = CA_FALSE;
+    thread->isStop = CA_TRUE;
     thread->threadTask = task;
 
     return CA_STATUS_OK;
 }
 
-CAResult_t CAQueueingThreadStart(CAQueueingThread_tthread)
+CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
 {
     if (thread == NULL)
     {
@@ -125,7 +131,7 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t* thread)
     }
 
     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
-            thread);
+                                            thread);
 
     if (res != CA_STATUS_OK)
     {
@@ -133,10 +139,12 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t* thread)
         return res;
     }
 
+    thread->isStop = CA_FALSE;
+
     return res;
 }
 
-CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint32_t size)
+CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
 {
     if (thread == NULL)
     {
@@ -152,7 +160,7 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint3
     }
 
     // create thread data
-    u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
+    u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
 
     if (message == NULL)
     {
@@ -179,7 +187,7 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint3
     return CA_STATUS_OK;
 }
 
-CAResult_t CAQueueingThreadDestroy(CAQueueingThread_tthread)
+CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
 {
     if (thread == NULL)
     {
@@ -190,13 +198,14 @@ CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t* thread)
     OIC_LOG_V(DEBUG, TAG, "thread destroy..");
 
     u_mutex_free(thread->threadMutex);
+    thread->threadMutex = NULL;
     u_cond_free(thread->threadCond);
     u_queue_delete(thread->dataQueue);
 
     return CA_STATUS_OK;
 }
 
-CAResult_t CAQueueingThreadStop(CAQueueingThread_tthread)
+CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
 {
     if (thread == NULL)
     {
@@ -206,17 +215,22 @@ CAResult_t CAQueueingThreadStop(CAQueueingThread_t* thread)
 
     OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
 
-    // mutex lock
-    u_mutex_lock(thread->threadMutex);
+    if (!thread->isStop)
+    {
+        // mutex lock
+        u_mutex_lock(thread->threadMutex);
 
-    // set stop flag
-    thread->isStop = CA_TRUE;
+        // set stop flag
+        thread->isStop = CA_TRUE;
 
-    // notity the thread
-    u_cond_signal(thread->threadCond);
+        // notity the thread
+        u_cond_signal(thread->threadCond);
 
-    // mutex unlock
-    u_mutex_unlock(thread->threadMutex);
+        u_cond_wait(thread->threadCond, thread->threadMutex);
+
+        // mutex unlock
+        u_mutex_unlock(thread->threadMutex);
+    }
 
     return CA_STATUS_OK;
 }