1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
22 #include <tinyara/config.h>
25 #include "iotivity_config.h"
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
36 #include "caqueueingthread.h"
37 #include "camessagehandler.h"
38 #include "oic_malloc.h"
41 #define TAG PCF("OIC_CA_QING")
43 static void CAQueueingThreadBaseRoutine(void *threadValue)
45 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
47 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
51 OIC_LOG(ERROR, TAG, "thread data passing error!!");
55 while (!thread->isStop)
58 oc_mutex_lock(thread->threadMutex);
60 // if queue is empty, thread will wait
61 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
63 OIC_LOG(DEBUG, TAG, "wait..");
66 oc_cond_wait(thread->threadCond, thread->threadMutex);
68 OIC_LOG(DEBUG, TAG, "wake up..");
75 oc_mutex_unlock(thread->threadMutex);
80 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
82 oc_mutex_unlock(thread->threadMutex);
89 thread->threadTask(message->msg);
92 if (NULL != thread->destroy)
94 thread->destroy(message->msg, message->size);
98 OICFree(message->msg);
104 oc_mutex_lock(thread->threadMutex);
105 oc_cond_signal(thread->threadCond);
106 oc_mutex_unlock(thread->threadMutex);
108 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
111 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
112 CAThreadTask task, CADataDestroyFunction destroy)
116 OIC_LOG(ERROR, TAG, "thread instance is empty..");
117 return CA_STATUS_INVALID_PARAM;
122 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
123 return CA_STATUS_INVALID_PARAM;
126 OIC_LOG(DEBUG, TAG, "thread initialize..");
128 // set send thread data
129 thread->threadPool = handle;
130 thread->dataQueue = u_queue_create();
131 thread->threadMutex = oc_mutex_new();
132 thread->threadCond = oc_cond_new();
133 thread->isStop = true;
134 thread->threadTask = task;
135 thread->destroy = destroy;
136 if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
138 goto ERROR_MEM_FAILURE;
144 if (thread->dataQueue)
146 u_queue_delete(thread->dataQueue);
147 thread->dataQueue = NULL;
149 if (thread->threadMutex)
151 oc_mutex_free(thread->threadMutex);
152 thread->threadMutex = NULL;
154 if (thread->threadCond)
156 oc_cond_free(thread->threadCond);
157 thread->threadCond = NULL;
159 return CA_MEMORY_ALLOC_FAILED;
162 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
164 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
169 OIC_LOG(ERROR, TAG, "thread instance is empty..");
170 return CA_STATUS_INVALID_PARAM;
173 if (NULL == thread->threadPool)
175 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
176 return CA_STATUS_INVALID_PARAM;
179 if (false == thread->isStop) //Queueing thread already running
181 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
186 oc_mutex_lock(thread->threadMutex);
187 thread->isStop = false;
189 oc_mutex_unlock(thread->threadMutex);
191 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
194 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
195 thread, NULL, thread_name,
196 CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
198 if (res != CA_STATUS_OK)
200 // update thread status.
201 oc_mutex_lock(thread->threadMutex);
202 thread->isStop = true;
203 oc_mutex_unlock(thread->threadMutex);
205 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
211 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
215 OIC_LOG(ERROR, TAG, "thread instance is empty..");
216 return CA_STATUS_INVALID_PARAM;
219 if (NULL == data || 0 == size)
221 OIC_LOG(ERROR, TAG, "data is empty..");
222 return CA_STATUS_INVALID_PARAM;
225 // create thread data
226 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
230 OIC_LOG(ERROR, TAG, "memory error!!");
231 return CA_MEMORY_ALLOC_FAILED;
235 message->size = size;
238 oc_mutex_lock(thread->threadMutex);
240 // add thread data into list
241 u_queue_add_element(thread->dataQueue, message);
244 oc_cond_signal(thread->threadCond);
247 oc_mutex_unlock(thread->threadMutex);
252 CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
256 OIC_LOG(ERROR, TAG, "thread instance is empty..");
257 return CA_STATUS_INVALID_PARAM;
260 OIC_LOG(DEBUG, TAG, "clear queue data..");
263 oc_mutex_lock(thread->threadMutex);
265 // remove all remained list data.
266 while (u_queue_get_size(thread->dataQueue) > 0)
269 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
274 if (NULL != thread->destroy)
276 thread->destroy(message->msg, message->size);
280 OICFree(message->msg);
288 oc_mutex_unlock(thread->threadMutex);
293 CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
294 CAContextDataDestroy callback, void *ctx)
298 OIC_LOG(ERROR, TAG, "thread instance is empty..");
299 return CA_STATUS_INVALID_PARAM;
302 if (NULL == callback)
304 OIC_LOG(ERROR, TAG, "callback is NULL..");
305 return CA_STATUS_INVALID_PARAM;
310 OIC_LOG(ERROR, TAG, "ctx is NULL..");
311 return CA_STATUS_INVALID_PARAM;
314 OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
317 oc_mutex_lock(thread->threadMutex);
319 // remove adapter related list data.
320 u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
323 oc_mutex_unlock(thread->threadMutex);
328 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
332 OIC_LOG(ERROR, TAG, "thread instance is empty..");
333 return CA_STATUS_INVALID_PARAM;
336 OIC_LOG(DEBUG, TAG, "thread destroy..");
339 oc_mutex_lock(thread->threadMutex);
341 // remove all remained list data.
342 while (u_queue_get_size(thread->dataQueue) > 0)
345 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
350 if (NULL != thread->destroy)
352 thread->destroy(message->msg, message->size);
356 OICFree(message->msg);
364 oc_mutex_unlock(thread->threadMutex);
366 oc_mutex_free(thread->threadMutex);
367 thread->threadMutex = NULL;
368 oc_cond_free(thread->threadCond);
369 thread->threadCond = NULL;
371 u_queue_delete(thread->dataQueue);
372 thread->dataQueue = NULL;
377 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
381 OIC_LOG(ERROR, TAG, "thread instance is empty..");
382 return CA_STATUS_INVALID_PARAM;
385 OIC_LOG(DEBUG, TAG, "thread stop request!!");
390 oc_mutex_lock(thread->threadMutex);
393 thread->isStop = true;
396 oc_cond_signal(thread->threadCond);
398 oc_cond_wait(thread->threadCond, thread->threadMutex);
401 oc_mutex_unlock(thread->threadMutex);