1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
25 #include <sys/types.h>
27 #include "caqueueingthread.h"
28 #include "oic_malloc.h"
31 #define TAG PCF("CA_QING")
33 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
37 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
41 OIC_LOG(ERROR, TAG, "thread data passing error!!");
46 while (!thread->isStop)
49 ca_mutex_lock(thread->threadMutex);
51 // if queue is empty, thread will wait
52 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
54 OIC_LOG(DEBUG, TAG, "wait..");
57 ca_cond_wait(thread->threadCond, thread->threadMutex);
59 OIC_LOG(DEBUG, TAG, "wake up..");
68 ca_mutex_unlock(thread->threadMutex);
73 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
75 ca_mutex_unlock(thread->threadMutex);
82 thread->threadTask(message->msg);
85 if (NULL != thread->destroy)
87 thread->destroy(message->msg, message->size);
91 OICFree(message->msg);
97 // remove all remained list data.
98 while (u_queue_get_size(thread->dataQueue) > 0)
101 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
106 if (NULL != thread->destroy)
108 thread->destroy(message->msg, message->size);
112 OICFree(message->msg);
119 ca_mutex_lock(thread->threadMutex);
120 ca_cond_signal(thread->threadCond);
121 ca_mutex_unlock(thread->threadMutex);
123 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
126 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
127 CAThreadTask task, CADataDestroyFunction destroy)
131 OIC_LOG(ERROR, TAG, "thread instance is empty..");
132 return CA_STATUS_INVALID_PARAM;
137 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
138 return CA_STATUS_INVALID_PARAM;
141 OIC_LOG(DEBUG, TAG, "thread initialize..");
143 // set send thread data
144 thread->threadPool = handle;
145 thread->dataQueue = u_queue_create();
146 thread->threadMutex = ca_mutex_new();
147 thread->threadCond = ca_cond_new();
148 thread->isStop = true;
149 thread->threadTask = task;
150 thread->destroy = destroy;
151 if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
152 goto ERROR_MEM_FAILURE;
156 if(thread->dataQueue)
158 u_queue_delete(thread->dataQueue);
159 thread->dataQueue = NULL;
161 if(thread->threadMutex)
163 ca_mutex_free(thread->threadMutex);
164 thread->threadMutex = NULL;
166 if(thread->threadCond)
168 ca_cond_free(thread->threadCond);
169 thread->threadCond = NULL;
171 return CA_MEMORY_ALLOC_FAILED;
175 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
179 OIC_LOG(ERROR, TAG, "thread instance is empty..");
180 return CA_STATUS_INVALID_PARAM;
183 if (NULL == thread->threadPool)
185 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
186 return CA_STATUS_INVALID_PARAM;
189 if (false == thread->isStop) //Queueing thread already running
191 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
196 ca_mutex_lock(thread->threadMutex);
197 thread->isStop = false;
199 ca_mutex_unlock(thread->threadMutex);
201 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
203 if (res != CA_STATUS_OK)
205 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
211 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
215 OIC_LOG(ERROR, TAG, "thread instance is empty..");
216 return CA_STATUS_INVALID_PARAM;
219 if (NULL == data || 0 == size)
221 OIC_LOG(ERROR, TAG, "data is empty..");
223 return CA_STATUS_INVALID_PARAM;
226 // create thread data
227 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
231 OIC_LOG(ERROR, TAG, "memory error!!");
232 return CA_MEMORY_ALLOC_FAILED;
236 message->size = size;
239 ca_mutex_lock(thread->threadMutex);
241 // add thread data into list
242 u_queue_add_element(thread->dataQueue, message);
245 ca_cond_signal(thread->threadCond);
248 ca_mutex_unlock(thread->threadMutex);
253 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
257 OIC_LOG(ERROR, TAG, "thread instance is empty..");
258 return CA_STATUS_INVALID_PARAM;
261 OIC_LOG(DEBUG, TAG, "thread destroy..");
263 ca_mutex_free(thread->threadMutex);
264 thread->threadMutex = NULL;
265 ca_cond_free(thread->threadCond);
266 u_queue_delete(thread->dataQueue);
271 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
275 OIC_LOG(ERROR, TAG, "thread instance is empty..");
276 return CA_STATUS_INVALID_PARAM;
279 OIC_LOG(DEBUG, TAG, "thread stop request!!");
284 ca_mutex_lock(thread->threadMutex);
287 thread->isStop = true;
290 ca_cond_signal(thread->threadCond);
292 ca_cond_wait(thread->threadCond, thread->threadMutex);
295 ca_mutex_unlock(thread->threadMutex);