*
******************************************************************/
+#ifdef __TIZENRT__
+#include <tinyara/config.h>
+#endif
+
+#include "iotivity_config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#endif
#include "caqueueingthread.h"
+#include "camessagehandler.h"
#include "oic_malloc.h"
#include "logger.h"
if (NULL == thread)
{
OIC_LOG(ERROR, TAG, "thread data passing error!!");
-
return;
}
while (!thread->isStop)
{
// mutex lock
- ca_mutex_lock(thread->threadMutex);
+ oc_mutex_lock(thread->threadMutex);
// if queue is empty, thread will wait
if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
OIC_LOG(DEBUG, TAG, "wait..");
// wait
- ca_cond_wait(thread->threadCond, thread->threadMutex);
+ oc_cond_wait(thread->threadCond, thread->threadMutex);
OIC_LOG(DEBUG, TAG, "wake up..");
}
if (thread->isStop)
{
// mutex unlock
- ca_mutex_unlock(thread->threadMutex);
+ oc_mutex_unlock(thread->threadMutex);
continue;
}
// get data
u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
// mutex unlock
- ca_mutex_unlock(thread->threadMutex);
+ oc_mutex_unlock(thread->threadMutex);
if (NULL == message)
{
continue;
OICFree(message);
}
- ca_mutex_lock(thread->threadMutex);
- ca_cond_signal(thread->threadCond);
- ca_mutex_unlock(thread->threadMutex);
+ oc_mutex_lock(thread->threadMutex);
+ oc_cond_signal(thread->threadCond);
+ oc_mutex_unlock(thread->threadMutex);
OIC_LOG(DEBUG, TAG, "message handler main thread end..");
}
// set send thread data
thread->threadPool = handle;
thread->dataQueue = u_queue_create();
- thread->threadMutex = ca_mutex_new();
- thread->threadCond = ca_cond_new();
+ thread->threadMutex = oc_mutex_new();
+ thread->threadCond = oc_cond_new();
thread->isStop = true;
thread->threadTask = task;
thread->destroy = destroy;
- if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
+ if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
+ {
goto ERROR_MEM_FAILURE;
+ }
return CA_STATUS_OK;
- ERROR_MEM_FAILURE:
- if(thread->dataQueue)
+
+ERROR_MEM_FAILURE:
+ if (thread->dataQueue)
{
u_queue_delete(thread->dataQueue);
thread->dataQueue = NULL;
}
- if(thread->threadMutex)
+ if (thread->threadMutex)
{
- ca_mutex_free(thread->threadMutex);
+ oc_mutex_free(thread->threadMutex);
thread->threadMutex = NULL;
}
- if(thread->threadCond)
+ if (thread->threadCond)
{
- ca_cond_free(thread->threadCond);
+ oc_cond_free(thread->threadCond);
thread->threadCond = NULL;
}
return CA_MEMORY_ALLOC_FAILED;
-
}
-
+#ifndef __TIZENRT__
CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
+#else
+CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
+#endif
{
if (NULL == thread)
{
}
// mutex lock
- ca_mutex_lock(thread->threadMutex);
+ oc_mutex_lock(thread->threadMutex);
thread->isStop = false;
// mutex unlock
- ca_mutex_unlock(thread->threadMutex);
-
+ oc_mutex_unlock(thread->threadMutex);
+#ifndef __TIZENRT__
+ CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
+ thread, NULL);
+#else
CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
- thread);
+ thread, NULL, thread_name,
+ CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
+#endif
if (res != CA_STATUS_OK)
{
+ // update thread status.
+ oc_mutex_lock(thread->threadMutex);
+ thread->isStop = true;
+ oc_mutex_unlock(thread->threadMutex);
+
OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
}
if (NULL == data || 0 == size)
{
OIC_LOG(ERROR, TAG, "data is empty..");
-
return CA_STATUS_INVALID_PARAM;
}
message->size = size;
// mutex lock
- ca_mutex_lock(thread->threadMutex);
+ oc_mutex_lock(thread->threadMutex);
+
+ // thread stop
+ if (thread->isStop)
+ {
+ // mutex unlock
+ oc_mutex_unlock(thread->threadMutex);
+
+ OICFree(message);
+ return CA_STATUS_FAILED;
+ }
// add thread data into list
u_queue_add_element(thread->dataQueue, message);
// notity the thread
- ca_cond_signal(thread->threadCond);
+ oc_cond_signal(thread->threadCond);
// mutex unlock
- ca_mutex_unlock(thread->threadMutex);
+ oc_mutex_unlock(thread->threadMutex);
+
+ return CA_STATUS_OK;
+}
+
+CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
+{
+ if (NULL == thread)
+ {
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
+ }
+
+ OIC_LOG(DEBUG, TAG, "clear queue data..");
+
+ // mutex lock
+ oc_mutex_lock(thread->threadMutex);
+
+ // remove all remained list data.
+ while (u_queue_get_size(thread->dataQueue) > 0)
+ {
+ // get data
+ u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
+
+ // free
+ if (NULL != message)
+ {
+ if (NULL != thread->destroy)
+ {
+ thread->destroy(message->msg, message->size);
+ }
+ else
+ {
+ OICFree(message->msg);
+ }
+
+ OICFree(message);
+ }
+ }
+
+ // mutex unlock
+ oc_mutex_unlock(thread->threadMutex);
+
+ return CA_STATUS_OK;
+}
+
+CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
+ CAContextDataDestroy callback, void *ctx)
+{
+ if (NULL == thread)
+ {
+ OIC_LOG(ERROR, TAG, "thread instance is empty..");
+ return CA_STATUS_INVALID_PARAM;
+ }
+
+ if (NULL == callback)
+ {
+ OIC_LOG(ERROR, TAG, "callback is NULL..");
+ return CA_STATUS_INVALID_PARAM;
+ }
+
+ if (NULL == ctx)
+ {
+ OIC_LOG(ERROR, TAG, "ctx is NULL..");
+ return CA_STATUS_INVALID_PARAM;
+ }
+
+ OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
+
+ // mutex lock
+ oc_mutex_lock(thread->threadMutex);
+
+ // remove adapter related list data.
+ u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
+
+ // mutex unlock
+ oc_mutex_unlock(thread->threadMutex);
return CA_STATUS_OK;
}
OIC_LOG(DEBUG, TAG, "thread destroy..");
- ca_mutex_free(thread->threadMutex);
- thread->threadMutex = NULL;
- ca_cond_free(thread->threadCond);
+ // mutex lock
+ oc_mutex_lock(thread->threadMutex);
// remove all remained list data.
while (u_queue_get_size(thread->dataQueue) > 0)
u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
// free
- if(NULL != message)
+ if (NULL != message)
{
if (NULL != thread->destroy)
{
}
}
+ // mutex unlock
+ oc_mutex_unlock(thread->threadMutex);
+
+ oc_mutex_free(thread->threadMutex);
+ thread->threadMutex = NULL;
+ oc_cond_free(thread->threadCond);
+ thread->threadCond = NULL;
+
u_queue_delete(thread->dataQueue);
+ thread->dataQueue = NULL;
return CA_STATUS_OK;
}
if (!thread->isStop)
{
// mutex lock
- ca_mutex_lock(thread->threadMutex);
+ oc_mutex_lock(thread->threadMutex);
// set stop flag
thread->isStop = true;
// notify the thread
- ca_cond_signal(thread->threadCond);
+ oc_cond_signal(thread->threadCond);
- ca_cond_wait(thread->threadCond, thread->threadMutex);
+ oc_cond_wait(thread->threadCond, thread->threadMutex);
// mutex unlock
- ca_mutex_unlock(thread->threadMutex);
+ oc_mutex_unlock(thread->threadMutex);
}
return CA_STATUS_OK;