X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fcsdk%2Fconnectivity%2Fsrc%2Fcaqueueingthread.c;h=0eb1f3caab7b755f02ce12d16ec448bf2db65d6b;hb=8229635f6d207516ccbbdf23b13be164e0fc1787;hp=3e10065c682d254389aa29adf3ef599ed74960ad;hpb=8754a91b157e957b0c0ade9f07804cde2b3cd225;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/csdk/connectivity/src/caqueueingthread.c b/resource/csdk/connectivity/src/caqueueingthread.c index 3e10065..0eb1f3c 100644 --- a/resource/csdk/connectivity/src/caqueueingthread.c +++ b/resource/csdk/connectivity/src/caqueueingthread.c @@ -18,6 +18,11 @@ * ******************************************************************/ +#ifdef __TIZENRT__ +#include +#endif + +#include "iotivity_config.h" #include #include #include @@ -29,6 +34,7 @@ #endif #include "caqueueingthread.h" +#include "camessagehandler.h" #include "oic_malloc.h" #include "logger.h" @@ -49,7 +55,7 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) while (!thread->isStop) { // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // if queue is empty, thread will wait if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0) @@ -57,7 +63,7 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) OIC_LOG(DEBUG, TAG, "wait.."); // wait - ca_cond_wait(thread->threadCond, thread->threadMutex); + oc_cond_wait(thread->threadCond, thread->threadMutex); OIC_LOG(DEBUG, TAG, "wake up.."); } @@ -66,14 +72,14 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) if (thread->isStop) { // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); continue; } // get data u_queue_message_t *message = u_queue_get_element(thread->dataQueue); // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); if (NULL == message) { continue; @@ -95,9 +101,9 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) OICFree(message); } - ca_mutex_lock(thread->threadMutex); - ca_cond_signal(thread->threadCond); - ca_mutex_unlock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); + oc_cond_signal(thread->threadCond); + oc_mutex_unlock(thread->threadMutex); OIC_LOG(DEBUG, TAG, "message handler main thread end.."); } @@ -122,8 +128,8 @@ CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool // set send thread data thread->threadPool = handle; thread->dataQueue = u_queue_create(); - thread->threadMutex = ca_mutex_new(); - thread->threadCond = ca_cond_new(); + thread->threadMutex = oc_mutex_new(); + thread->threadCond = oc_cond_new(); thread->isStop = true; thread->threadTask = task; thread->destroy = destroy; @@ -142,18 +148,21 @@ ERROR_MEM_FAILURE: } if (thread->threadMutex) { - ca_mutex_free(thread->threadMutex); + oc_mutex_free(thread->threadMutex); thread->threadMutex = NULL; } if (thread->threadCond) { - ca_cond_free(thread->threadCond); + oc_cond_free(thread->threadCond); thread->threadCond = NULL; } return CA_MEMORY_ALLOC_FAILED; } - +#ifndef __TIZENRT__ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread) +#else +CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name) +#endif { if (NULL == thread) { @@ -174,15 +183,25 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread) } // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); thread->isStop = false; // mutex unlock - ca_mutex_unlock(thread->threadMutex); - + oc_mutex_unlock(thread->threadMutex); +#ifndef __TIZENRT__ CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, - thread); + thread, NULL); +#else + CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, + thread, NULL, thread_name, + CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE); +#endif if (res != CA_STATUS_OK) { + // update thread status. + oc_mutex_lock(thread->threadMutex); + thread->isStop = true; + oc_mutex_unlock(thread->threadMutex); + OIC_LOG(ERROR, TAG, "thread pool add task error(send thread)."); } @@ -200,7 +219,6 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint3 if (NULL == data || 0 == size) { OIC_LOG(ERROR, TAG, "data is empty.."); - return CA_STATUS_INVALID_PARAM; } @@ -217,16 +235,92 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint3 message->size = size; // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // add thread data into list u_queue_add_element(thread->dataQueue, message); // notity the thread - ca_cond_signal(thread->threadCond); + oc_cond_signal(thread->threadCond); // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); + + return CA_STATUS_OK; +} + +CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread) +{ + if (NULL == thread) + { + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "clear queue data.."); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove all remained list data. + while (u_queue_get_size(thread->dataQueue) > 0) + { + // get data + u_queue_message_t *message = u_queue_get_element(thread->dataQueue); + + // free + if (NULL != message) + { + if (NULL != thread->destroy) + { + thread->destroy(message->msg, message->size); + } + else + { + OICFree(message->msg); + } + + OICFree(message); + } + } + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + return CA_STATUS_OK; +} + +CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread, + CAContextDataDestroy callback, void *ctx) +{ + if (NULL == thread) + { + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + if (NULL == callback) + { + OIC_LOG(ERROR, TAG, "callback is NULL.."); + return CA_STATUS_INVALID_PARAM; + } + + if (NULL == ctx) + { + OIC_LOG(ERROR, TAG, "ctx is NULL.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "Clear thread data according to context"); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove adapter related list data. + u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy); + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); return CA_STATUS_OK; } @@ -241,9 +335,8 @@ CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread) OIC_LOG(DEBUG, TAG, "thread destroy.."); - ca_mutex_free(thread->threadMutex); - thread->threadMutex = NULL; - ca_cond_free(thread->threadCond); + // mutex lock + oc_mutex_lock(thread->threadMutex); // remove all remained list data. while (u_queue_get_size(thread->dataQueue) > 0) @@ -267,7 +360,15 @@ CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread) } } + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + oc_mutex_free(thread->threadMutex); + thread->threadMutex = NULL; + oc_cond_free(thread->threadCond); + u_queue_delete(thread->dataQueue); + thread->dataQueue = NULL; return CA_STATUS_OK; } @@ -285,18 +386,18 @@ CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread) if (!thread->isStop) { // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // set stop flag thread->isStop = true; // notify the thread - ca_cond_signal(thread->threadCond); + oc_cond_signal(thread->threadCond); - ca_cond_wait(thread->threadCond, thread->threadMutex); + oc_cond_wait(thread->threadCond, thread->threadMutex); // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); } return CA_STATUS_OK;