X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fcsdk%2Fconnectivity%2Fsrc%2Fcaqueueingthread.c;h=ba7b47867a6804efb6c4773bb935da9a4cdd145c;hb=c04ac05a42d644464490feecf905cfd4aaf64b76;hp=e12a8d46b123d53b599950a7c6a02ff2b6f64192;hpb=c1ff651a41834cdc82bb6bc67e937cc35c490625;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/csdk/connectivity/src/caqueueingthread.c b/resource/csdk/connectivity/src/caqueueingthread.c index e12a8d4..ba7b478 100644 --- a/resource/csdk/connectivity/src/caqueueingthread.c +++ b/resource/csdk/connectivity/src/caqueueingthread.c @@ -18,137 +18,191 @@ * ******************************************************************/ +#ifdef __TIZENRT__ +#include +#endif + +#include "iotivity_config.h" #include #include #include +#ifdef HAVE_UNISTD_H #include -#include +#endif +#ifdef HAVE_SYS_TYPES_H #include +#endif #include "caqueueingthread.h" +#include "camessagehandler.h" #include "oic_malloc.h" #include "logger.h" -#define TAG PCF("CA") +#define TAG PCF("OIC_CA_QING") static void CAQueueingThreadBaseRoutine(void *threadValue) { - OIC_LOG_V(DEBUG, TAG, "message handler main thread start.."); + OIC_LOG(DEBUG, TAG, "message handler main thread start.."); CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue; - if (thread == NULL) + if (NULL == thread) { - OIC_LOG_V(DEBUG, TAG, "thread data passing error!!"); - + OIC_LOG(ERROR, TAG, "thread data passing error!!"); return; } while (!thread->isStop) { // mutex lock - u_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // if queue is empty, thread will wait - if (u_queue_get_size(thread->dataQueue) <= 0) + if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0) { - OIC_LOG_V(DEBUG, TAG, "wait.."); + OIC_LOG(DEBUG, TAG, "wait.."); // wait - u_cond_wait(thread->threadCond, thread->threadMutex); + oc_cond_wait(thread->threadCond, thread->threadMutex); - OIC_LOG_V(DEBUG, TAG, "wake up.."); + OIC_LOG(DEBUG, TAG, "wake up.."); } - // mutex unlock - u_mutex_unlock(thread->threadMutex); - // check stop flag if (thread->isStop) + { + // mutex unlock + oc_mutex_unlock(thread->threadMutex); continue; + } // get data u_queue_message_t *message = u_queue_get_element(thread->dataQueue); - if (message == NULL) + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + if (NULL == message) { continue; } - void *data = message->msg; - // process data - thread->threadTask(data); + thread->threadTask(message->msg); // free + if (NULL != thread->destroy) + { + thread->destroy(message->msg, message->size); + } + else + { + OICFree(message->msg); + } + OICFree(message); } - u_cond_signal(thread->threadCond); + oc_mutex_lock(thread->threadMutex); + oc_cond_signal(thread->threadCond); + oc_mutex_unlock(thread->threadMutex); - OIC_LOG_V(DEBUG, TAG, "message handler main thread end.."); + OIC_LOG(DEBUG, TAG, "message handler main thread end.."); } -CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle, - CAThreadTask task) +CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle, + CAThreadTask task, CADataDestroyFunction destroy) { - if (thread == NULL) + if (NULL == thread) { - OIC_LOG_V(DEBUG, TAG, "thread instance is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; } - if (handle == NULL) + if (NULL == handle) { - OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread pool handle is empty.."); + return CA_STATUS_INVALID_PARAM; } - OIC_LOG_V(DEBUG, TAG, "thread initialize.."); - - memset(thread, 0, sizeof(CAQueueingThread_t)); - - // mutex init - u_mutex_init(); + OIC_LOG(DEBUG, TAG, "thread initialize.."); // set send thread data thread->threadPool = handle; thread->dataQueue = u_queue_create(); - thread->threadMutex = u_mutex_new(); - thread->threadCond = u_cond_new(); - thread->isStop = CA_TRUE; + thread->threadMutex = oc_mutex_new(); + thread->threadCond = oc_cond_new(); + thread->isStop = true; thread->threadTask = task; + thread->destroy = destroy; + if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond) + { + goto ERROR_MEM_FAILURE; + } return CA_STATUS_OK; -} +ERROR_MEM_FAILURE: + if (thread->dataQueue) + { + u_queue_delete(thread->dataQueue); + thread->dataQueue = NULL; + } + if (thread->threadMutex) + { + oc_mutex_free(thread->threadMutex); + thread->threadMutex = NULL; + } + if (thread->threadCond) + { + oc_cond_free(thread->threadCond); + thread->threadCond = NULL; + } + return CA_MEMORY_ALLOC_FAILED; +} +#ifndef __TIZENRT__ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread) +#else +CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name) +#endif { - if (thread == NULL) + if (NULL == thread) { - OIC_LOG_V(DEBUG, TAG, "thread instance is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; } - if (thread->threadPool == NULL) + if (NULL == thread->threadPool) { - OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread pool handle is empty.."); + return CA_STATUS_INVALID_PARAM; } - if (CA_FALSE == thread->isStop) //Queueing thread already running + if (false == thread->isStop) //Queueing thread already running { - OIC_LOG_V(DEBUG, TAG, "queueing thread already running.."); + OIC_LOG(DEBUG, TAG, "queueing thread already running.."); return CA_STATUS_OK; } - thread->isStop = CA_FALSE; - CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, - thread); + // mutex lock + oc_mutex_lock(thread->threadMutex); + thread->isStop = false; + // mutex unlock + oc_mutex_unlock(thread->threadMutex); +#ifndef __TIZENRT__ + CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, + thread, NULL); +#else + CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine, + thread, NULL, thread_name, + CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE); +#endif if (res != CA_STATUS_OK) { - OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread)."); - thread->isStop = CA_TRUE; - return res; + // update thread status. + oc_mutex_lock(thread->threadMutex); + thread->isStop = true; + oc_mutex_unlock(thread->threadMutex); + + OIC_LOG(ERROR, TAG, "thread pool add task error(send thread)."); } return res; @@ -156,92 +210,206 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread) CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size) { - if (thread == NULL) + if (NULL == thread) { - OIC_LOG_V(DEBUG, TAG, "thread instance is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; } - if (data == NULL || size == 0) + if (NULL == data || 0 == size) { - OIC_LOG_V(DEBUG, TAG, "data is empty.."); - - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "data is empty.."); + return CA_STATUS_INVALID_PARAM; } // create thread data u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t)); - if (message == NULL) + if (NULL == message) { - OIC_LOG_V(DEBUG, TAG, "memory error!!"); + OIC_LOG(ERROR, TAG, "memory error!!"); return CA_MEMORY_ALLOC_FAILED; } - memset(message, 0, sizeof(u_queue_message_t)); message->msg = data; - message->size = sizeof(size); + message->size = size; // mutex lock - u_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); + + // thread stop + if (thread->isStop) + { + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + OICFree(message); + return CA_STATUS_FAILED; + } // add thread data into list u_queue_add_element(thread->dataQueue, message); // notity the thread - u_cond_signal(thread->threadCond); + oc_cond_signal(thread->threadCond); // mutex unlock - u_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); + + return CA_STATUS_OK; +} + +CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread) +{ + if (NULL == thread) + { + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "clear queue data.."); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove all remained list data. + while (u_queue_get_size(thread->dataQueue) > 0) + { + // get data + u_queue_message_t *message = u_queue_get_element(thread->dataQueue); + + // free + if (NULL != message) + { + if (NULL != thread->destroy) + { + thread->destroy(message->msg, message->size); + } + else + { + OICFree(message->msg); + } + + OICFree(message); + } + } + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + return CA_STATUS_OK; +} + +CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread, + CAContextDataDestroy callback, void *ctx) +{ + if (NULL == thread) + { + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + if (NULL == callback) + { + OIC_LOG(ERROR, TAG, "callback is NULL.."); + return CA_STATUS_INVALID_PARAM; + } + + if (NULL == ctx) + { + OIC_LOG(ERROR, TAG, "ctx is NULL.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "Clear thread data according to context"); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove adapter related list data. + u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy); + + // mutex unlock + oc_mutex_unlock(thread->threadMutex); return CA_STATUS_OK; } CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread) { - if (thread == NULL) + if (NULL == thread) { - OIC_LOG_V(DEBUG, TAG, "thread instance is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; + } + + OIC_LOG(DEBUG, TAG, "thread destroy.."); + + // mutex lock + oc_mutex_lock(thread->threadMutex); + + // remove all remained list data. + while (u_queue_get_size(thread->dataQueue) > 0) + { + // get data + u_queue_message_t *message = u_queue_get_element(thread->dataQueue); + + // free + if (NULL != message) + { + if (NULL != thread->destroy) + { + thread->destroy(message->msg, message->size); + } + else + { + OICFree(message->msg); + } + + OICFree(message); + } } - OIC_LOG_V(DEBUG, TAG, "thread destroy.."); + // mutex unlock + oc_mutex_unlock(thread->threadMutex); - u_mutex_free(thread->threadMutex); + oc_mutex_free(thread->threadMutex); thread->threadMutex = NULL; - u_cond_free(thread->threadCond); + oc_cond_free(thread->threadCond); + thread->threadCond = NULL; + u_queue_delete(thread->dataQueue); + thread->dataQueue = NULL; return CA_STATUS_OK; } CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread) { - if (thread == NULL) + if (NULL == thread) { - OIC_LOG_V(DEBUG, TAG, "thread instance is empty.."); - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "thread instance is empty.."); + return CA_STATUS_INVALID_PARAM; } - OIC_LOG_V(DEBUG, TAG, "thread stop request!!"); + OIC_LOG(DEBUG, TAG, "thread stop request!!"); if (!thread->isStop) { // mutex lock - u_mutex_lock(thread->threadMutex); + oc_mutex_lock(thread->threadMutex); // set stop flag - thread->isStop = CA_TRUE; + thread->isStop = true; - // notity the thread - u_cond_signal(thread->threadCond); + // notify the thread + oc_cond_signal(thread->threadCond); - u_cond_wait(thread->threadCond, thread->threadMutex); + oc_cond_wait(thread->threadCond, thread->threadMutex); // mutex unlock - u_mutex_unlock(thread->threadMutex); + oc_mutex_unlock(thread->threadMutex); } return CA_STATUS_OK; } -