Implementation of connectivity abstraction feature Release v0.3
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
index 0bd08eb..11fbee2 100644 (file)
@@ -21,6 +21,9 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
 
 #include "caqueueingthread.h"
 #include "oic_malloc.h"
 
 #define TAG PCF("CA")
 
-static void CAQueueingThreadBaseRoutine(void* treadValue)
+static void CAQueueingThreadBaseRoutine(void *threadValue)
 {
     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
 
-    CAQueueingThread_t* thread = (CAQueueingThread_t*) treadValue;
+    CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
 
     if (thread == NULL)
     {
@@ -64,9 +67,9 @@ static void CAQueueingThreadBaseRoutine(void* treadValue)
             continue;
 
         // get data
-        u_queue_message_tmessage = u_queue_get_element(thread->dataQueue);
+        u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
 
-        voiddata = message->msg;
+        void *data = message->msg;
 
         // process data
         thread->threadTask(data);
@@ -74,11 +77,13 @@ static void CAQueueingThreadBaseRoutine(void* treadValue)
         // free
     }
 
+    u_cond_signal(thread->threadCond);
+
     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
 }
 
-CAResult_t CAQueueingThreadInitialize(CAQueueingThread_tthread, u_thread_pool_t handle,
-        CAThreadTask task)
+CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
+                                      CAThreadTask task)
 {
     if (thread == NULL)
     {
@@ -110,7 +115,7 @@ CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t* thread, u_thread_pool_
     return CA_STATUS_OK;
 }
 
-CAResult_t CAQueueingThreadStart(CAQueueingThread_tthread)
+CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
 {
     if (thread == NULL)
     {
@@ -125,7 +130,7 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t* thread)
     }
 
     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
-            thread);
+                                            thread);
 
     if (res != CA_STATUS_OK)
     {
@@ -136,7 +141,7 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t* thread)
     return res;
 }
 
-CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint32_t size)
+CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
 {
     if (thread == NULL)
     {
@@ -152,7 +157,7 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint3
     }
 
     // create thread data
-    u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
+    u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
 
     if (message == NULL)
     {
@@ -179,7 +184,7 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint3
     return CA_STATUS_OK;
 }
 
-CAResult_t CAQueueingThreadDestroy(CAQueueingThread_tthread)
+CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
 {
     if (thread == NULL)
     {
@@ -190,13 +195,14 @@ CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t* thread)
     OIC_LOG_V(DEBUG, TAG, "thread destroy..");
 
     u_mutex_free(thread->threadMutex);
+    thread->threadMutex = NULL;
     u_cond_free(thread->threadCond);
     u_queue_delete(thread->dataQueue);
 
     return CA_STATUS_OK;
 }
 
-CAResult_t CAQueueingThreadStop(CAQueueingThread_tthread)
+CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
 {
     if (thread == NULL)
     {
@@ -215,6 +221,8 @@ CAResult_t CAQueueingThreadStop(CAQueueingThread_t* thread)
     // notity the thread
     u_cond_signal(thread->threadCond);
 
+    u_cond_wait(thread->threadCond, thread->threadMutex);    
+
     // mutex unlock
     u_mutex_unlock(thread->threadMutex);