Revert "Generate iotivity_config.h at build time"
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #ifdef HAVE_UNISTD_H
25 #include <unistd.h>
26 #endif
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30
31 #include "caqueueingthread.h"
32 #include "oic_malloc.h"
33 #include "logger.h"
34
35 #define TAG PCF("OIC_CA_QING")
36
37 static void CAQueueingThreadBaseRoutine(void *threadValue)
38 {
39     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
40
41     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
42
43     if (NULL == thread)
44     {
45         OIC_LOG(ERROR, TAG, "thread data passing error!!");
46         return;
47     }
48
49     while (!thread->isStop)
50     {
51         // mutex lock
52         ca_mutex_lock(thread->threadMutex);
53
54         // if queue is empty, thread will wait
55         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
56         {
57             OIC_LOG(DEBUG, TAG, "wait..");
58
59             // wait
60             ca_cond_wait(thread->threadCond, thread->threadMutex);
61
62             OIC_LOG(DEBUG, TAG, "wake up..");
63         }
64
65         // check stop flag
66         if (thread->isStop)
67         {
68             // mutex unlock
69             ca_mutex_unlock(thread->threadMutex);
70             continue;
71         }
72
73         // get data
74         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
75         // mutex unlock
76         ca_mutex_unlock(thread->threadMutex);
77         if (NULL == message)
78         {
79             continue;
80         }
81
82         // process data
83         thread->threadTask(message->msg);
84
85         // free
86         if (NULL != thread->destroy)
87         {
88             thread->destroy(message->msg, message->size);
89         }
90         else
91         {
92             OICFree(message->msg);
93         }
94
95         OICFree(message);
96     }
97
98     ca_mutex_lock(thread->threadMutex);
99     ca_cond_signal(thread->threadCond);
100     ca_mutex_unlock(thread->threadMutex);
101
102     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
103 }
104
105 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
106                                       CAThreadTask task, CADataDestroyFunction destroy)
107 {
108     if (NULL == thread)
109     {
110         OIC_LOG(ERROR, TAG, "thread instance is empty..");
111         return CA_STATUS_INVALID_PARAM;
112     }
113
114     if (NULL == handle)
115     {
116         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
117         return CA_STATUS_INVALID_PARAM;
118     }
119
120     OIC_LOG(DEBUG, TAG, "thread initialize..");
121
122     // set send thread data
123     thread->threadPool = handle;
124     thread->dataQueue = u_queue_create();
125     thread->threadMutex = ca_mutex_new();
126     thread->threadCond = ca_cond_new();
127     thread->isStop = true;
128     thread->threadTask = task;
129     thread->destroy = destroy;
130     if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
131     {
132         goto ERROR_MEM_FAILURE;
133     }
134
135     return CA_STATUS_OK;
136
137 ERROR_MEM_FAILURE:
138     if (thread->dataQueue)
139     {
140         u_queue_delete(thread->dataQueue);
141         thread->dataQueue = NULL;
142     }
143     if (thread->threadMutex)
144     {
145         ca_mutex_free(thread->threadMutex);
146         thread->threadMutex = NULL;
147     }
148     if (thread->threadCond)
149     {
150         ca_cond_free(thread->threadCond);
151         thread->threadCond = NULL;
152     }
153     return CA_MEMORY_ALLOC_FAILED;
154 }
155
156 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
157 {
158     if (NULL == thread)
159     {
160         OIC_LOG(ERROR, TAG, "thread instance is empty..");
161         return CA_STATUS_INVALID_PARAM;
162     }
163
164     if (NULL == thread->threadPool)
165     {
166         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
167         return CA_STATUS_INVALID_PARAM;
168     }
169
170     if (false == thread->isStop) //Queueing thread already running
171     {
172         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
173         return CA_STATUS_OK;
174     }
175
176     // mutex lock
177     ca_mutex_lock(thread->threadMutex);
178     thread->isStop = false;
179     // mutex unlock
180     ca_mutex_unlock(thread->threadMutex);
181
182     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
183                                             thread);
184     if (res != CA_STATUS_OK)
185     {
186         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
187     }
188
189     return res;
190 }
191
192 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
193 {
194     if (NULL == thread)
195     {
196         OIC_LOG(ERROR, TAG, "thread instance is empty..");
197         return CA_STATUS_INVALID_PARAM;
198     }
199
200     if (NULL == data || 0 == size)
201     {
202         OIC_LOG(ERROR, TAG, "data is empty..");
203
204         return CA_STATUS_INVALID_PARAM;
205     }
206
207     // create thread data
208     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
209
210     if (NULL == message)
211     {
212         OIC_LOG(ERROR, TAG, "memory error!!");
213         return CA_MEMORY_ALLOC_FAILED;
214     }
215
216     message->msg = data;
217     message->size = size;
218
219     // mutex lock
220     ca_mutex_lock(thread->threadMutex);
221
222     // add thread data into list
223     u_queue_add_element(thread->dataQueue, message);
224
225     // notity the thread
226     ca_cond_signal(thread->threadCond);
227
228     // mutex unlock
229     ca_mutex_unlock(thread->threadMutex);
230
231     return CA_STATUS_OK;
232 }
233
234 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
235 {
236     if (NULL == thread)
237     {
238         OIC_LOG(ERROR, TAG, "thread instance is empty..");
239         return CA_STATUS_INVALID_PARAM;
240     }
241
242     OIC_LOG(DEBUG, TAG, "thread destroy..");
243
244     ca_mutex_free(thread->threadMutex);
245     thread->threadMutex = NULL;
246     ca_cond_free(thread->threadCond);
247
248     // remove all remained list data.
249     while (u_queue_get_size(thread->dataQueue) > 0)
250     {
251         // get data
252         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
253
254         // free
255         if (NULL != message)
256         {
257             if (NULL != thread->destroy)
258             {
259                 thread->destroy(message->msg, message->size);
260             }
261             else
262             {
263                 OICFree(message->msg);
264             }
265
266             OICFree(message);
267         }
268     }
269
270     u_queue_delete(thread->dataQueue);
271
272     return CA_STATUS_OK;
273 }
274
275 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
276 {
277     if (NULL == thread)
278     {
279         OIC_LOG(ERROR, TAG, "thread instance is empty..");
280         return CA_STATUS_INVALID_PARAM;
281     }
282
283     OIC_LOG(DEBUG, TAG, "thread stop request!!");
284
285     if (!thread->isStop)
286     {
287         // mutex lock
288         ca_mutex_lock(thread->threadMutex);
289
290         // set stop flag
291         thread->isStop = true;
292
293         // notify the thread
294         ca_cond_signal(thread->threadCond);
295
296         ca_cond_wait(thread->threadCond, thread->threadMutex);
297
298         // mutex unlock
299         ca_mutex_unlock(thread->threadMutex);
300     }
301
302     return CA_STATUS_OK;
303 }