#include "oic_malloc.h"
#include "logger.h"
-#define TAG PCF("CA")
+#define TAG PCF("CA_QING")
static void CAQueueingThreadBaseRoutine(void *threadValue)
{
- OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
+ OIC_LOG(DEBUG, TAG, "message handler main thread start..");
CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
- if (thread == NULL)
+ if (NULL == thread)
{
- OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
+ OIC_LOG(ERROR, TAG, "thread data passing error!!");
return;
}
while (!thread->isStop)
{
// mutex lock
- u_mutex_lock(thread->threadMutex);
+ ca_mutex_lock(thread->threadMutex);
// if queue is empty, thread will wait
- if (u_queue_get_size(thread->dataQueue) <= 0)
+ if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
{
- OIC_LOG_V(DEBUG, TAG, "wait..");
+ OIC_LOG(DEBUG, TAG, "wait..");
// wait
- u_cond_wait(thread->threadCond, thread->threadMutex);
+ ca_cond_wait(thread->threadCond, thread->threadMutex);
- OIC_LOG_V(DEBUG, TAG, "wake up..");
+ OIC_LOG(DEBUG, TAG, "wake up..");
}
- // mutex unlock
- u_mutex_unlock(thread->threadMutex);
+
// check stop flag
if (thread->isStop)
+ {
+ // mutex unlock
+ ca_mutex_unlock(thread->threadMutex);
continue;
+ }
// get data
u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
-
- void *data = message->msg;
+ // mutex unlock
+ ca_mutex_unlock(thread->threadMutex);
+ if (NULL == message)
+ {
+ continue;
+ }
// process data
- thread->threadTask(data);
+ thread->threadTask(message->msg);
// free
+ if (NULL != thread->destroy)
+ {
+ thread->destroy(message->msg, message->size);
+ }
+ else
+ {
+ OICFree(message->msg);
+ }
+
OICFree(message);
}
- u_cond_signal(thread->threadCond);
+ // remove all remained list data.
+ while (u_queue_get_size(thread->dataQueue) > 0)
+ {
+ // get data
+ u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
+
+ // free
+ if(NULL != message)
+ {
+ if (NULL != thread->destroy)
+ {
+ thread->destroy(message->msg, message->size);
+ }
+ else
+ {
+ OICFree(message->msg);
+ }
+
+ OICFree(message);
+ }
+ }
+
+ ca_mutex_lock(thread->threadMutex);
+ ca_cond_signal(thread->threadCond);
+ ca_mutex_unlock(thread->threadMutex);
- OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
+ OIC_LOG(DEBUG, TAG, "message handler main thread end..");
}
-CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
- CAThreadTask task)
+CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
+ CAThreadTask task, CADataDestroyFunction destroy)
{
- if (thread == NULL)
+ if (NULL == thread)
{
- OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- if (handle == NULL)
+ if (NULL == handle)
{
- OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- OIC_LOG_V(DEBUG, TAG, "thread initialize..");
-
- memset(thread, 0, sizeof(CAQueueingThread_t));
-
- // mutex init
- u_mutex_init();
+ OIC_LOG(DEBUG, TAG, "thread initialize..");
// set send thread data
thread->threadPool = handle;
thread->dataQueue = u_queue_create();
- thread->threadMutex = u_mutex_new();
- thread->threadCond = u_cond_new();
- thread->isStop = CA_TRUE;
+ thread->threadMutex = ca_mutex_new();
+ thread->threadCond = ca_cond_new();
+ thread->isStop = true;
thread->threadTask = task;
+ thread->destroy = destroy;
+ if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
+ goto ERROR_MEM_FAILURE;
return CA_STATUS_OK;
+ ERROR_MEM_FAILURE:
+ if(thread->dataQueue)
+ {
+ u_queue_delete(thread->dataQueue);
+ thread->dataQueue = NULL;
+ }
+ if(thread->threadMutex)
+ {
+ ca_mutex_free(thread->threadMutex);
+ thread->threadMutex = NULL;
+ }
+ if(thread->threadCond)
+ {
+ ca_cond_free(thread->threadCond);
+ thread->threadCond = NULL;
+ }
+ return CA_MEMORY_ALLOC_FAILED;
+
}
CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
{
- if (thread == NULL)
+ if (NULL == thread)
{
- OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- if (thread->threadPool == NULL)
+ if (NULL == thread->threadPool)
{
- OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- if (CA_FALSE == thread->isStop) //Queueing thread already running
+ if (false == thread->isStop) //Queueing thread already running
{
- OIC_LOG_V(DEBUG, TAG, "queueing thread already running..");
+ OIC_LOG(DEBUG, TAG, "queueing thread already running..");
return CA_STATUS_OK;
}
- thread->isStop = CA_FALSE;
- CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
+ // mutex lock
+ ca_mutex_lock(thread->threadMutex);
+ thread->isStop = false;
+ // mutex unlock
+ ca_mutex_unlock(thread->threadMutex);
+
+ CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
thread);
if (res != CA_STATUS_OK)
{
- OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread).");
- thread->isStop = CA_TRUE;
- return res;
+ OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
}
return res;
CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
{
- if (thread == NULL)
+ if (NULL == thread)
{
- OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- if (data == NULL || size == 0)
+ if (NULL == data || 0 == size)
{
- OIC_LOG_V(DEBUG, TAG, "data is empty..");
+ OIC_LOG(ERROR, TAG, "data is empty..");
- return CA_STATUS_FAILED;
+ return CA_STATUS_INVALID_PARAM;
}
// create thread data
u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
- if (message == NULL)
+ if (NULL == message)
{
- OIC_LOG_V(DEBUG, TAG, "memory error!!");
+ OIC_LOG(ERROR, TAG, "memory error!!");
return CA_MEMORY_ALLOC_FAILED;
}
- memset(message, 0, sizeof(u_queue_message_t));
message->msg = data;
- message->size = sizeof(size);
+ message->size = size;
// mutex lock
- u_mutex_lock(thread->threadMutex);
+ ca_mutex_lock(thread->threadMutex);
// add thread data into list
u_queue_add_element(thread->dataQueue, message);
// notity the thread
- u_cond_signal(thread->threadCond);
+ ca_cond_signal(thread->threadCond);
// mutex unlock
- u_mutex_unlock(thread->threadMutex);
+ ca_mutex_unlock(thread->threadMutex);
return CA_STATUS_OK;
}
CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
{
- if (thread == NULL)
+ if (NULL == thread)
{
- OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- OIC_LOG_V(DEBUG, TAG, "thread destroy..");
+ OIC_LOG(DEBUG, TAG, "thread destroy..");
- u_mutex_free(thread->threadMutex);
+ ca_mutex_free(thread->threadMutex);
thread->threadMutex = NULL;
- u_cond_free(thread->threadCond);
+ ca_cond_free(thread->threadCond);
u_queue_delete(thread->dataQueue);
return CA_STATUS_OK;
CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
{
- if (thread == NULL)
+ if (NULL == thread)
{
- OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
- return CA_STATUS_FAILED;
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
}
- OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
+ OIC_LOG(DEBUG, TAG, "thread stop request!!");
if (!thread->isStop)
{
// mutex lock
- u_mutex_lock(thread->threadMutex);
+ ca_mutex_lock(thread->threadMutex);
// set stop flag
- thread->isStop = CA_TRUE;
+ thread->isStop = true;
- // notity the thread
- u_cond_signal(thread->threadCond);
+ // notify the thread
+ ca_cond_signal(thread->threadCond);
- u_cond_wait(thread->threadCond, thread->threadMutex);
+ ca_cond_wait(thread->threadCond, thread->threadMutex);
// mutex unlock
- u_mutex_unlock(thread->threadMutex);
+ ca_mutex_unlock(thread->threadMutex);
}
return CA_STATUS_OK;
}
-