1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
25 #include <sys/syscall.h>
26 #include <sys/types.h>
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
36 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
38 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
42 OIC_LOG(ERROR, TAG, "thread data passing error!!");
47 while (!thread->isStop)
50 ca_mutex_lock(thread->threadMutex);
52 // if queue is empty, thread will wait
53 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
55 OIC_LOG(DEBUG, TAG, "wait..");
58 ca_cond_wait(thread->threadCond, thread->threadMutex);
60 OIC_LOG(DEBUG, TAG, "wake up..");
64 ca_mutex_unlock(thread->threadMutex);
73 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
80 thread->threadTask(message->msg);
83 if (NULL != thread->destroy)
85 thread->destroy(message->msg, message->size);
89 OICFree(message->msg);
95 // remove all remained list data.
96 while (u_queue_get_size(thread->dataQueue) > 0)
99 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
104 if (NULL != thread->destroy)
106 thread->destroy(message->msg, message->size);
110 OICFree(message->msg);
117 ca_mutex_lock(thread->threadMutex);
118 ca_cond_signal(thread->threadCond);
119 ca_mutex_unlock(thread->threadMutex);
121 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
124 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
125 CAThreadTask task, CADataDestroyFunction destroy)
129 OIC_LOG(ERROR, TAG, "thread instance is empty..");
130 return CA_STATUS_INVALID_PARAM;
135 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
136 return CA_STATUS_INVALID_PARAM;
139 OIC_LOG(DEBUG, TAG, "thread initialize..");
141 // set send thread data
142 thread->threadPool = handle;
143 thread->dataQueue = u_queue_create();
144 thread->threadMutex = ca_mutex_new();
145 thread->threadCond = ca_cond_new();
146 thread->isStop = true;
147 thread->threadTask = task;
148 thread->destroy = destroy;
153 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
157 OIC_LOG(ERROR, TAG, "thread instance is empty..");
158 return CA_STATUS_INVALID_PARAM;
161 if (NULL == thread->threadPool)
163 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
164 return CA_STATUS_INVALID_PARAM;
167 if (false == thread->isStop) //Queueing thread already running
169 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
174 ca_mutex_lock(thread->threadMutex);
175 thread->isStop = false;
177 ca_mutex_unlock(thread->threadMutex);
179 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
181 if (res != CA_STATUS_OK)
183 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
189 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
193 OIC_LOG(ERROR, TAG, "thread instance is empty..");
194 return CA_STATUS_INVALID_PARAM;
197 if (NULL == data || 0 == size)
199 OIC_LOG(ERROR, TAG, "data is empty..");
201 return CA_STATUS_INVALID_PARAM;
204 // create thread data
205 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
209 OIC_LOG(ERROR, TAG, "memory error!!");
210 return CA_MEMORY_ALLOC_FAILED;
214 message->size = size;
217 ca_mutex_lock(thread->threadMutex);
219 // add thread data into list
220 u_queue_add_element(thread->dataQueue, message);
223 ca_cond_signal(thread->threadCond);
226 ca_mutex_unlock(thread->threadMutex);
231 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
235 OIC_LOG(ERROR, TAG, "thread instance is empty..");
236 return CA_STATUS_INVALID_PARAM;
239 OIC_LOG(DEBUG, TAG, "thread destroy..");
241 ca_mutex_free(thread->threadMutex);
242 thread->threadMutex = NULL;
243 ca_cond_free(thread->threadCond);
244 u_queue_delete(thread->dataQueue);
249 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
253 OIC_LOG(ERROR, TAG, "thread instance is empty..");
254 return CA_STATUS_INVALID_PARAM;
257 OIC_LOG(DEBUG, TAG, "thread stop request!!");
262 ca_mutex_lock(thread->threadMutex);
265 thread->isStop = true;
268 ca_cond_signal(thread->threadCond);
270 ca_cond_wait(thread->threadCond, thread->threadMutex);
273 ca_mutex_unlock(thread->threadMutex);