1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
22 #include <tinyara/config.h>
25 #include "iotivity_config.h"
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
36 #include "caqueueingthread.h"
37 #include "camessagehandler.h"
38 #include "oic_malloc.h"
41 #define TAG PCF("OIC_CA_QING")
43 static void CAQueueingThreadBaseRoutine(void *threadValue)
45 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
47 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
51 OIC_LOG(ERROR, TAG, "thread data passing error!!");
55 if (NULL == thread->threadMutex || NULL == thread->threadCond || NULL == thread->dataQueue)
57 OIC_LOG(ERROR, TAG, "thread data was already destroyed");
61 while (!thread->isStop)
64 oc_mutex_lock(thread->threadMutex);
66 // if queue is empty, thread will wait
67 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
69 OIC_LOG(DEBUG, TAG, "wait..");
72 oc_cond_wait(thread->threadCond, thread->threadMutex);
74 OIC_LOG(DEBUG, TAG, "wake up..");
81 oc_mutex_unlock(thread->threadMutex);
86 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
88 oc_mutex_unlock(thread->threadMutex);
95 thread->threadTask(message->msg);
98 if (NULL != thread->destroy)
100 thread->destroy(message->msg, message->size);
104 OICFree(message->msg);
110 oc_mutex_lock(thread->threadMutex);
111 oc_cond_signal(thread->threadCond);
112 oc_mutex_unlock(thread->threadMutex);
114 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
117 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
118 CAThreadTask task, CADataDestroyFunction destroy)
122 OIC_LOG(ERROR, TAG, "thread instance is empty..");
123 return CA_STATUS_INVALID_PARAM;
128 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
129 return CA_STATUS_INVALID_PARAM;
132 OIC_LOG(DEBUG, TAG, "thread initialize..");
134 // set send thread data
135 thread->threadPool = handle;
136 thread->dataQueue = u_queue_create();
137 thread->threadMutex = oc_mutex_new();
138 thread->threadCond = oc_cond_new();
139 thread->isStop = true;
140 thread->threadTask = task;
141 thread->destroy = destroy;
142 if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
144 goto ERROR_MEM_FAILURE;
150 if (thread->dataQueue)
152 u_queue_delete(thread->dataQueue);
153 thread->dataQueue = NULL;
155 if (thread->threadMutex)
157 oc_mutex_free(thread->threadMutex);
158 thread->threadMutex = NULL;
160 if (thread->threadCond)
162 oc_cond_free(thread->threadCond);
163 thread->threadCond = NULL;
165 return CA_MEMORY_ALLOC_FAILED;
168 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
170 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
175 OIC_LOG(ERROR, TAG, "thread instance is empty..");
176 return CA_STATUS_INVALID_PARAM;
179 if (NULL == thread->threadPool)
181 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
182 return CA_STATUS_INVALID_PARAM;
185 if (false == thread->isStop) //Queueing thread already running
187 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
192 oc_mutex_lock(thread->threadMutex);
193 thread->isStop = false;
195 oc_mutex_unlock(thread->threadMutex);
197 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
200 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
201 thread, NULL, thread_name,
202 CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
204 if (res != CA_STATUS_OK)
206 // update thread status.
207 oc_mutex_lock(thread->threadMutex);
208 thread->isStop = true;
209 oc_mutex_unlock(thread->threadMutex);
211 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
217 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
221 OIC_LOG(ERROR, TAG, "thread instance is empty..");
222 return CA_STATUS_INVALID_PARAM;
225 if (NULL == data || 0 == size)
227 OIC_LOG(ERROR, TAG, "data is empty..");
228 return CA_STATUS_INVALID_PARAM;
231 // create thread data
232 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
236 OIC_LOG(ERROR, TAG, "memory error!!");
237 return CA_MEMORY_ALLOC_FAILED;
241 message->size = size;
244 oc_mutex_lock(thread->threadMutex);
246 // add thread data into list
247 u_queue_add_element(thread->dataQueue, message);
250 oc_cond_signal(thread->threadCond);
253 oc_mutex_unlock(thread->threadMutex);
258 CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
262 OIC_LOG(ERROR, TAG, "thread instance is empty..");
263 return CA_STATUS_INVALID_PARAM;
266 OIC_LOG(DEBUG, TAG, "clear queue data..");
269 oc_mutex_lock(thread->threadMutex);
271 // remove all remained list data.
272 while (u_queue_get_size(thread->dataQueue) > 0)
275 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
280 if (NULL != thread->destroy)
282 thread->destroy(message->msg, message->size);
286 OICFree(message->msg);
294 oc_mutex_unlock(thread->threadMutex);
299 CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
300 CAContextDataDestroy callback, void *ctx)
304 OIC_LOG(ERROR, TAG, "thread instance is empty..");
305 return CA_STATUS_INVALID_PARAM;
308 if (NULL == callback)
310 OIC_LOG(ERROR, TAG, "callback is NULL..");
311 return CA_STATUS_INVALID_PARAM;
316 OIC_LOG(ERROR, TAG, "ctx is NULL..");
317 return CA_STATUS_INVALID_PARAM;
320 OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
323 oc_mutex_lock(thread->threadMutex);
325 // remove adapter related list data.
326 u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
329 oc_mutex_unlock(thread->threadMutex);
334 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
338 OIC_LOG(ERROR, TAG, "thread instance is empty..");
339 return CA_STATUS_INVALID_PARAM;
342 OIC_LOG(DEBUG, TAG, "thread destroy..");
345 oc_mutex_lock(thread->threadMutex);
347 // remove all remained list data.
348 while (u_queue_get_size(thread->dataQueue) > 0)
351 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
356 if (NULL != thread->destroy)
358 thread->destroy(message->msg, message->size);
362 OICFree(message->msg);
370 oc_mutex_unlock(thread->threadMutex);
372 oc_mutex_free(thread->threadMutex);
373 thread->threadMutex = NULL;
374 oc_cond_free(thread->threadCond);
375 thread->threadCond = NULL;
377 u_queue_delete(thread->dataQueue);
378 thread->dataQueue = NULL;
383 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
387 OIC_LOG(ERROR, TAG, "thread instance is empty..");
388 return CA_STATUS_INVALID_PARAM;
391 OIC_LOG(DEBUG, TAG, "thread stop request!!");
396 oc_mutex_lock(thread->threadMutex);
399 thread->isStop = true;
402 oc_cond_signal(thread->threadCond);
404 oc_cond_wait(thread->threadCond, thread->threadMutex);
407 oc_mutex_unlock(thread->threadMutex);