1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
31 #include "caqueueingthread.h"
32 #include "oic_malloc.h"
35 #define TAG PCF("OIC_CA_QING")
37 static void CAQueueingThreadBaseRoutine(void *threadValue)
39 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
41 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
45 OIC_LOG(ERROR, TAG, "thread data passing error!!");
49 while (!thread->isStop)
52 ca_mutex_lock(thread->threadMutex);
54 // if queue is empty, thread will wait
55 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
57 OIC_LOG(DEBUG, TAG, "wait..");
60 ca_cond_wait(thread->threadCond, thread->threadMutex);
62 OIC_LOG(DEBUG, TAG, "wake up..");
69 ca_mutex_unlock(thread->threadMutex);
74 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
76 ca_mutex_unlock(thread->threadMutex);
83 thread->threadTask(message->msg);
86 if (NULL != thread->destroy)
88 thread->destroy(message->msg, message->size);
92 OICFree(message->msg);
98 ca_mutex_lock(thread->threadMutex);
99 ca_cond_signal(thread->threadCond);
100 ca_mutex_unlock(thread->threadMutex);
102 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
105 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
106 CAThreadTask task, CADataDestroyFunction destroy)
110 OIC_LOG(ERROR, TAG, "thread instance is empty..");
111 return CA_STATUS_INVALID_PARAM;
116 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
117 return CA_STATUS_INVALID_PARAM;
120 OIC_LOG(DEBUG, TAG, "thread initialize..");
122 // set send thread data
123 thread->threadPool = handle;
124 thread->dataQueue = u_queue_create();
125 thread->threadMutex = ca_mutex_new();
126 thread->threadCond = ca_cond_new();
127 thread->isStop = true;
128 thread->threadTask = task;
129 thread->destroy = destroy;
130 if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
132 goto ERROR_MEM_FAILURE;
138 if (thread->dataQueue)
140 u_queue_delete(thread->dataQueue);
141 thread->dataQueue = NULL;
143 if (thread->threadMutex)
145 ca_mutex_free(thread->threadMutex);
146 thread->threadMutex = NULL;
148 if (thread->threadCond)
150 ca_cond_free(thread->threadCond);
151 thread->threadCond = NULL;
153 return CA_MEMORY_ALLOC_FAILED;
156 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
160 OIC_LOG(ERROR, TAG, "thread instance is empty..");
161 return CA_STATUS_INVALID_PARAM;
164 if (NULL == thread->threadPool)
166 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
167 return CA_STATUS_INVALID_PARAM;
170 if (false == thread->isStop) //Queueing thread already running
172 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
177 ca_mutex_lock(thread->threadMutex);
178 thread->isStop = false;
180 ca_mutex_unlock(thread->threadMutex);
182 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
184 if (res != CA_STATUS_OK)
186 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
192 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
196 OIC_LOG(ERROR, TAG, "thread instance is empty..");
197 return CA_STATUS_INVALID_PARAM;
200 if (NULL == data || 0 == size)
202 OIC_LOG(ERROR, TAG, "data is empty..");
204 return CA_STATUS_INVALID_PARAM;
207 // create thread data
208 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
212 OIC_LOG(ERROR, TAG, "memory error!!");
213 return CA_MEMORY_ALLOC_FAILED;
217 message->size = size;
220 ca_mutex_lock(thread->threadMutex);
222 // add thread data into list
223 u_queue_add_element(thread->dataQueue, message);
226 ca_cond_signal(thread->threadCond);
229 ca_mutex_unlock(thread->threadMutex);
234 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
238 OIC_LOG(ERROR, TAG, "thread instance is empty..");
239 return CA_STATUS_INVALID_PARAM;
242 OIC_LOG(DEBUG, TAG, "thread destroy..");
244 ca_mutex_free(thread->threadMutex);
245 thread->threadMutex = NULL;
246 ca_cond_free(thread->threadCond);
248 // remove all remained list data.
249 while (u_queue_get_size(thread->dataQueue) > 0)
252 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
257 if (NULL != thread->destroy)
259 thread->destroy(message->msg, message->size);
263 OICFree(message->msg);
270 u_queue_delete(thread->dataQueue);
275 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
279 OIC_LOG(ERROR, TAG, "thread instance is empty..");
280 return CA_STATUS_INVALID_PARAM;
283 OIC_LOG(DEBUG, TAG, "thread stop request!!");
288 ca_mutex_lock(thread->threadMutex);
291 thread->isStop = true;
294 ca_cond_signal(thread->threadCond);
296 ca_cond_wait(thread->threadCond, thread->threadMutex);
299 ca_mutex_unlock(thread->threadMutex);