X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fcsdk%2Fconnectivity%2Fsrc%2Fcaqueueingthread.c;h=0eb1f3caab7b755f02ce12d16ec448bf2db65d6b;hb=refs%2Ftags%2Ftizen_4.0.m2_release;hp=754b1c29392f53df5df6c84ce18b94dc1a83c1c3;hpb=bb93e3a07afd2126aa7665c4c56de50e2a1c9bfa;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/csdk/connectivity/src/caqueueingthread.c b/resource/csdk/connectivity/src/caqueueingthread.c index 754b1c2..0eb1f3c 100644 --- a/resource/csdk/connectivity/src/caqueueingthread.c +++ b/resource/csdk/connectivity/src/caqueueingthread.c @@ -18,13 +18,23 @@ * ******************************************************************/ +#ifdef __TIZENRT__ +#include +#endif + +#include "iotivity_config.h" #include #include #include +#ifdef HAVE_UNISTD_H #include +#endif +#ifdef HAVE_SYS_TYPES_H #include +#endif #include "caqueueingthread.h" +#include "camessagehandler.h" #include "oic_malloc.h" #include "logger.h" @@ -39,14 +49,13 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) if (NULL == thread) { OIC_LOG(ERROR, TAG, "thread data passing error!!"); - return; } while (!thread->isStop) { // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // if queue is empty, thread will wait if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0) @@ -54,7 +63,7 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) OIC_LOG(DEBUG, TAG, "wait.."); // wait - ca_cond_wait(thread->threadCond, thread->threadMutex); + oc_cond_wait(thread->threadCond, thread->threadMutex); OIC_LOG(DEBUG, TAG, "wake up.."); } @@ -63,14 +72,14 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) if (thread->isStop) { // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); continue; } // get data u_queue_message_t *message = u_queue_get_element(thread->dataQueue); // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); if (NULL == message) { continue; @@ -92,31 +101,9 @@ static void CAQueueingThreadBaseRoutine(void *threadValue) OICFree(message); } - // remove all remained list data. - while (u_queue_get_size(thread->dataQueue) > 0) - { - // get data - u_queue_message_t *message = u_queue_get_element(thread->dataQueue); - - // free - if(NULL != message) - { - if (NULL != thread->destroy) - { - thread->destroy(message->msg, message->size); - } - else - { - OICFree(message->msg); - } - - OICFree(message); - } - } - - ca_mutex_lock(thread->threadMutex); - ca_cond_signal(thread->threadCond); - ca_mutex_unlock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); + oc_cond_signal(thread->threadCond); + oc_mutex_unlock(thread->threadMutex); OIC_LOG(DEBUG, TAG, "message handler main thread end.."); } @@ -141,36 +128,41 @@ CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool // set send thread data thread->threadPool = handle; thread->dataQueue = u_queue_create(); - thread->threadMutex = ca_mutex_new(); - thread->threadCond = ca_cond_new(); + thread->threadMutex = oc_mutex_new(); + thread->threadCond = oc_cond_new(); thread->isStop = true; thread->threadTask = task; thread->destroy = destroy; - if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond) + if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond) + { goto ERROR_MEM_FAILURE; + } return CA_STATUS_OK; - ERROR_MEM_FAILURE: - if(thread->dataQueue) + +ERROR_MEM_FAILURE: + if (thread->dataQueue) { u_queue_delete(thread->dataQueue); thread->dataQueue = NULL; } - if(thread->threadMutex) + if (thread->threadMutex) { - ca_mutex_free(thread->threadMutex); + oc_mutex_free(thread->threadMutex); thread->threadMutex = NULL; } - if(thread->threadCond) + if (thread->threadCond) { - ca_cond_free(thread->threadCond); + oc_cond_free(thread->threadCond); thread->threadCond = NULL; } return CA_MEMORY_ALLOC_FAILED; - } - +#ifndef __TIZENRT__ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread) +#else +CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name) +#endif { if (NULL == thread) { @@ -191,15 +183,25 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread) } // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); thread->isStop = false; // mutex unlock - ca_mutex_unlock(thread->threadMutex); - + oc_mutex_unlock(thread->threadMutex); +#ifndef __TIZENRT__ + CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, + thread, NULL); +#else CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, - thread); + thread, NULL, thread_name, + CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE); +#endif if (res != CA_STATUS_OK) { + // update thread status. + oc_mutex_lock(thread->threadMutex); + thread->isStop = true; + oc_mutex_unlock(thread->threadMutex); + OIC_LOG(ERROR, TAG, "thread pool add task error(send thread)."); } @@ -217,7 +219,6 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint3 if (NULL == data || 0 == size) { OIC_LOG(ERROR, TAG, "data is empty.."); - return CA_STATUS_INVALID_PARAM; } @@ -234,16 +235,92 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint3 message->size = size; // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // add thread data into list u_queue_add_element(thread->dataQueue, message); // notity the thread - ca_cond_signal(thread->threadCond); + oc_cond_signal(thread->threadCond); // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); + + return CA_STATUS_OK; +} + +CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread) +{ + if (NULL == thread) + { + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "clear queue data.."); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove all remained list data. + while (u_queue_get_size(thread->dataQueue) > 0) + { + // get data + u_queue_message_t *message = u_queue_get_element(thread->dataQueue); + + // free + if (NULL != message) + { + if (NULL != thread->destroy) + { + thread->destroy(message->msg, message->size); + } + else + { + OICFree(message->msg); + } + + OICFree(message); + } + } + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + return CA_STATUS_OK; +} + +CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread, + CAContextDataDestroy callback, void *ctx) +{ + if (NULL == thread) + { + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + if (NULL == callback) + { + OIC_LOG(ERROR, TAG, "callback is NULL.."); + return CA_STATUS_INVALID_PARAM; + } + + if (NULL == ctx) + { + OIC_LOG(ERROR, TAG, "ctx is NULL.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "Clear thread data according to context"); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove adapter related list data. + u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy); + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); return CA_STATUS_OK; } @@ -258,10 +335,40 @@ CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread) OIC_LOG(DEBUG, TAG, "thread destroy.."); - ca_mutex_free(thread->threadMutex); + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove all remained list data. + while (u_queue_get_size(thread->dataQueue) > 0) + { + // get data + u_queue_message_t *message = u_queue_get_element(thread->dataQueue); + + // free + if (NULL != message) + { + if (NULL != thread->destroy) + { + thread->destroy(message->msg, message->size); + } + else + { + OICFree(message->msg); + } + + OICFree(message); + } + } + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + oc_mutex_free(thread->threadMutex); thread->threadMutex = NULL; - ca_cond_free(thread->threadCond); + oc_cond_free(thread->threadCond); + u_queue_delete(thread->dataQueue); + thread->dataQueue = NULL; return CA_STATUS_OK; } @@ -279,18 +386,18 @@ CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread) if (!thread->isStop) { // mutex lock - ca_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // set stop flag thread->isStop = true; // notify the thread - ca_cond_signal(thread->threadCond); + oc_cond_signal(thread->threadCond); - ca_cond_wait(thread->threadCond, thread->threadMutex); + oc_cond_wait(thread->threadCond, thread->threadMutex); // mutex unlock - ca_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); } return CA_STATUS_OK;