1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
21 #include "iotivity_config.h"
28 #ifdef HAVE_SYS_TYPES_H
29 #include <sys/types.h>
32 #include "caqueueingthread.h"
33 #include "oic_malloc.h"
36 #define TAG PCF("OIC_CA_QING")
38 static void CAQueueingThreadBaseRoutine(void *threadValue)
40 OIC_LOG(DEBUG, TAG, "message handler main thread start..");
42 CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
46 OIC_LOG(ERROR, TAG, "thread data passing error!!");
50 while (!thread->isStop)
53 ca_mutex_lock(thread->threadMutex);
55 // if queue is empty, thread will wait
56 if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
58 OIC_LOG(DEBUG, TAG, "wait..");
61 ca_cond_wait(thread->threadCond, thread->threadMutex);
63 OIC_LOG(DEBUG, TAG, "wake up..");
70 ca_mutex_unlock(thread->threadMutex);
75 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
77 ca_mutex_unlock(thread->threadMutex);
84 thread->threadTask(message->msg);
87 if (NULL != thread->destroy)
89 thread->destroy(message->msg, message->size);
93 OICFree(message->msg);
99 ca_mutex_lock(thread->threadMutex);
100 ca_cond_signal(thread->threadCond);
101 ca_mutex_unlock(thread->threadMutex);
103 OIC_LOG(DEBUG, TAG, "message handler main thread end..");
106 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
107 CAThreadTask task, CADataDestroyFunction destroy)
111 OIC_LOG(ERROR, TAG, "thread instance is empty..");
112 return CA_STATUS_INVALID_PARAM;
117 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
118 return CA_STATUS_INVALID_PARAM;
121 OIC_LOG(DEBUG, TAG, "thread initialize..");
123 // set send thread data
124 thread->threadPool = handle;
125 thread->dataQueue = u_queue_create();
126 thread->threadMutex = ca_mutex_new();
127 thread->threadCond = ca_cond_new();
128 thread->isStop = true;
129 thread->threadTask = task;
130 thread->destroy = destroy;
131 if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
133 goto ERROR_MEM_FAILURE;
139 if (thread->dataQueue)
141 u_queue_delete(thread->dataQueue);
142 thread->dataQueue = NULL;
144 if (thread->threadMutex)
146 ca_mutex_free(thread->threadMutex);
147 thread->threadMutex = NULL;
149 if (thread->threadCond)
151 ca_cond_free(thread->threadCond);
152 thread->threadCond = NULL;
154 return CA_MEMORY_ALLOC_FAILED;
157 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
161 OIC_LOG(ERROR, TAG, "thread instance is empty..");
162 return CA_STATUS_INVALID_PARAM;
165 if (NULL == thread->threadPool)
167 OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
168 return CA_STATUS_INVALID_PARAM;
171 if (false == thread->isStop) //Queueing thread already running
173 OIC_LOG(DEBUG, TAG, "queueing thread already running..");
178 ca_mutex_lock(thread->threadMutex);
179 thread->isStop = false;
181 ca_mutex_unlock(thread->threadMutex);
183 CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
185 if (res != CA_STATUS_OK)
187 OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
193 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
197 OIC_LOG(ERROR, TAG, "thread instance is empty..");
198 return CA_STATUS_INVALID_PARAM;
201 if (NULL == data || 0 == size)
203 OIC_LOG(ERROR, TAG, "data is empty..");
205 return CA_STATUS_INVALID_PARAM;
208 // create thread data
209 u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
213 OIC_LOG(ERROR, TAG, "memory error!!");
214 return CA_MEMORY_ALLOC_FAILED;
218 message->size = size;
221 ca_mutex_lock(thread->threadMutex);
223 // add thread data into list
224 u_queue_add_element(thread->dataQueue, message);
227 ca_cond_signal(thread->threadCond);
230 ca_mutex_unlock(thread->threadMutex);
235 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
239 OIC_LOG(ERROR, TAG, "thread instance is empty..");
240 return CA_STATUS_INVALID_PARAM;
243 OIC_LOG(DEBUG, TAG, "thread destroy..");
245 ca_mutex_free(thread->threadMutex);
246 thread->threadMutex = NULL;
247 ca_cond_free(thread->threadCond);
249 // remove all remained list data.
250 while (u_queue_get_size(thread->dataQueue) > 0)
253 u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
258 if (NULL != thread->destroy)
260 thread->destroy(message->msg, message->size);
264 OICFree(message->msg);
271 u_queue_delete(thread->dataQueue);
272 thread->dataQueue = NULL;
277 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
281 OIC_LOG(ERROR, TAG, "thread instance is empty..");
282 return CA_STATUS_INVALID_PARAM;
285 OIC_LOG(DEBUG, TAG, "thread stop request!!");
290 ca_mutex_lock(thread->threadMutex);
293 thread->isStop = true;
296 ca_cond_signal(thread->threadCond);
298 ca_cond_wait(thread->threadCond, thread->threadMutex);
301 ca_mutex_unlock(thread->threadMutex);