1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
25 #include <sys/types.h>
27 #include "caqueueingthread.h"
28 #include "oic_malloc.h"
31 #define TAG PCF("OIC_CA_QING")
33 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
37 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
41 OIC_LOG(ERROR, TAG, "thread data passing error!!");
45 while (!thread->isStop)
48 ca_mutex_lock(thread->threadMutex);
50 // if queue is empty, thread will wait
51 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
53 OIC_LOG(DEBUG, TAG, "wait..");
56 ca_cond_wait(thread->threadCond, thread->threadMutex);
58 OIC_LOG(DEBUG, TAG, "wake up..");
65 ca_mutex_unlock(thread->threadMutex);
70 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
72 ca_mutex_unlock(thread->threadMutex);
79 thread->threadTask(message->msg);
82 if (NULL != thread->destroy)
84 thread->destroy(message->msg, message->size);
88 OICFree(message->msg);
94 ca_mutex_lock(thread->threadMutex);
95 ca_cond_signal(thread->threadCond);
96 ca_mutex_unlock(thread->threadMutex);
98 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
101 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
102 CAThreadTask task, CADataDestroyFunction destroy)
106 OIC_LOG(ERROR, TAG, "thread instance is empty..");
107 return CA_STATUS_INVALID_PARAM;
112 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
113 return CA_STATUS_INVALID_PARAM;
116 OIC_LOG(DEBUG, TAG, "thread initialize..");
118 // set send thread data
119 thread->threadPool = handle;
120 thread->dataQueue = u_queue_create();
121 thread->threadMutex = ca_mutex_new();
122 thread->threadCond = ca_cond_new();
123 thread->isStop = true;
124 thread->threadTask = task;
125 thread->destroy = destroy;
126 if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
128 goto ERROR_MEM_FAILURE;
134 if (thread->dataQueue)
136 u_queue_delete(thread->dataQueue);
137 thread->dataQueue = NULL;
139 if (thread->threadMutex)
141 ca_mutex_free(thread->threadMutex);
142 thread->threadMutex = NULL;
144 if (thread->threadCond)
146 ca_cond_free(thread->threadCond);
147 thread->threadCond = NULL;
149 return CA_MEMORY_ALLOC_FAILED;
152 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
156 OIC_LOG(ERROR, TAG, "thread instance is empty..");
157 return CA_STATUS_INVALID_PARAM;
160 if (NULL == thread->threadPool)
162 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
163 return CA_STATUS_INVALID_PARAM;
166 if (false == thread->isStop) //Queueing thread already running
168 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
173 ca_mutex_lock(thread->threadMutex);
174 thread->isStop = false;
176 ca_mutex_unlock(thread->threadMutex);
178 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
180 if (res != CA_STATUS_OK)
182 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
188 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
192 OIC_LOG(ERROR, TAG, "thread instance is empty..");
193 return CA_STATUS_INVALID_PARAM;
196 if (NULL == data || 0 == size)
198 OIC_LOG(ERROR, TAG, "data is empty..");
200 return CA_STATUS_INVALID_PARAM;
203 // create thread data
204 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
208 OIC_LOG(ERROR, TAG, "memory error!!");
209 return CA_MEMORY_ALLOC_FAILED;
213 message->size = size;
216 ca_mutex_lock(thread->threadMutex);
218 // add thread data into list
219 u_queue_add_element(thread->dataQueue, message);
222 ca_cond_signal(thread->threadCond);
225 ca_mutex_unlock(thread->threadMutex);
230 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
234 OIC_LOG(ERROR, TAG, "thread instance is empty..");
235 return CA_STATUS_INVALID_PARAM;
238 OIC_LOG(DEBUG, TAG, "thread destroy..");
240 ca_mutex_free(thread->threadMutex);
241 thread->threadMutex = NULL;
242 ca_cond_free(thread->threadCond);
244 // remove all remained list data.
245 while (u_queue_get_size(thread->dataQueue) > 0)
248 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
253 if (NULL != thread->destroy)
255 thread->destroy(message->msg, message->size);
259 OICFree(message->msg);
266 u_queue_delete(thread->dataQueue);
271 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
275 OIC_LOG(ERROR, TAG, "thread instance is empty..");
276 return CA_STATUS_INVALID_PARAM;
279 OIC_LOG(DEBUG, TAG, "thread stop request!!");
284 ca_mutex_lock(thread->threadMutex);
287 thread->isStop = true;
290 ca_cond_signal(thread->threadCond);
292 ca_cond_wait(thread->threadCond, thread->threadMutex);
295 ca_mutex_unlock(thread->threadMutex);