Merge "Remove unused openssl-devel dependency" into tizen
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #ifdef __TIZENRT__
22 #include <tinyara/config.h>
23 #endif
24
25 #include "iotivity_config.h"
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
34 #endif
35
36 #include "caqueueingthread.h"
37 #include "camessagehandler.h"
38 #include "oic_malloc.h"
39 #include "logger.h"
40
41 #define TAG PCF("OIC_CA_QING")
42
43 static void CAQueueingThreadBaseRoutine(void *threadValue)
44 {
45     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
46
47     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
48
49     if (NULL == thread)
50     {
51         OIC_LOG(ERROR, TAG, "thread data passing error!!");
52         return;
53     }
54
55     if (NULL == thread->threadMutex || NULL == thread->threadCond || NULL == thread->dataQueue)
56     {
57         OIC_LOG(ERROR, TAG, "thread data was already destroyed");
58         return;
59     }
60
61     while (!thread->isStop)
62     {
63         // mutex lock
64         oc_mutex_lock(thread->threadMutex);
65
66         // if queue is empty, thread will wait
67         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
68         {
69             OIC_LOG(DEBUG, TAG, "wait..");
70
71             // wait
72             oc_cond_wait(thread->threadCond, thread->threadMutex);
73
74             OIC_LOG(DEBUG, TAG, "wake up..");
75         }
76
77         // check stop flag
78         if (thread->isStop)
79         {
80             // mutex unlock
81             oc_mutex_unlock(thread->threadMutex);
82             continue;
83         }
84
85         // get data
86         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
87         // mutex unlock
88         oc_mutex_unlock(thread->threadMutex);
89         if (NULL == message)
90         {
91             continue;
92         }
93
94         // process data
95         thread->threadTask(message->msg);
96
97         // free
98         if (NULL != thread->destroy)
99         {
100             thread->destroy(message->msg, message->size);
101         }
102         else
103         {
104             OICFree(message->msg);
105         }
106
107         OICFree(message);
108     }
109
110     oc_mutex_lock(thread->threadMutex);
111     oc_cond_signal(thread->threadCond);
112     oc_mutex_unlock(thread->threadMutex);
113
114     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
115 }
116
117 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
118                                       CAThreadTask task, CADataDestroyFunction destroy)
119 {
120     if (NULL == thread)
121     {
122         OIC_LOG(ERROR, TAG, "thread instance is empty..");
123         return CA_STATUS_INVALID_PARAM;
124     }
125
126     if (NULL == handle)
127     {
128         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
129         return CA_STATUS_INVALID_PARAM;
130     }
131
132     OIC_LOG(DEBUG, TAG, "thread initialize..");
133
134     // set send thread data
135     thread->threadPool = handle;
136     thread->dataQueue = u_queue_create();
137     thread->threadMutex = oc_mutex_new();
138     thread->threadCond = oc_cond_new();
139     thread->isStop = true;
140     thread->threadTask = task;
141     thread->destroy = destroy;
142     if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
143     {
144         goto ERROR_MEM_FAILURE;
145     }
146
147     return CA_STATUS_OK;
148
149 ERROR_MEM_FAILURE:
150     if (thread->dataQueue)
151     {
152         u_queue_delete(thread->dataQueue);
153         thread->dataQueue = NULL;
154     }
155     if (thread->threadMutex)
156     {
157         oc_mutex_free(thread->threadMutex);
158         thread->threadMutex = NULL;
159     }
160     if (thread->threadCond)
161     {
162         oc_cond_free(thread->threadCond);
163         thread->threadCond = NULL;
164     }
165     return CA_MEMORY_ALLOC_FAILED;
166 }
167 #ifndef __TIZENRT__
168 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
169 #else
170 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
171 #endif
172 {
173     if (NULL == thread)
174     {
175         OIC_LOG(ERROR, TAG, "thread instance is empty..");
176         return CA_STATUS_INVALID_PARAM;
177     }
178
179     if (NULL == thread->threadPool)
180     {
181         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
182         return CA_STATUS_INVALID_PARAM;
183     }
184
185     if (false == thread->isStop) //Queueing thread already running
186     {
187         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
188         return CA_STATUS_OK;
189     }
190
191     // mutex lock
192     oc_mutex_lock(thread->threadMutex);
193     thread->isStop = false;
194     // mutex unlock
195     oc_mutex_unlock(thread->threadMutex);
196 #ifndef __TIZENRT__
197     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
198                                              thread, NULL);
199 #else
200     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
201                                              thread, NULL, thread_name,
202                                              CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
203 #endif
204     if (res != CA_STATUS_OK)
205     {
206         // update thread status.
207         oc_mutex_lock(thread->threadMutex);
208         thread->isStop = true;
209         oc_mutex_unlock(thread->threadMutex);
210
211         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
212     }
213
214     return res;
215 }
216
217 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
218 {
219     if (NULL == thread)
220     {
221         OIC_LOG(ERROR, TAG, "thread instance is empty..");
222         return CA_STATUS_INVALID_PARAM;
223     }
224
225     if (NULL == data || 0 == size)
226     {
227         OIC_LOG(ERROR, TAG, "data is empty..");
228         return CA_STATUS_INVALID_PARAM;
229     }
230
231     // create thread data
232     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
233
234     if (NULL == message)
235     {
236         OIC_LOG(ERROR, TAG, "memory error!!");
237         return CA_MEMORY_ALLOC_FAILED;
238     }
239
240     message->msg = data;
241     message->size = size;
242
243     // mutex lock
244     oc_mutex_lock(thread->threadMutex);
245
246     // add thread data into list
247     u_queue_add_element(thread->dataQueue, message);
248
249     // notity the thread
250     oc_cond_signal(thread->threadCond);
251
252     // mutex unlock
253     oc_mutex_unlock(thread->threadMutex);
254
255     return CA_STATUS_OK;
256 }
257
258 CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
259 {
260     if (NULL == thread)
261     {
262         OIC_LOG(ERROR, TAG, "thread instance is empty..");
263         return CA_STATUS_INVALID_PARAM;
264     }
265
266     OIC_LOG(DEBUG, TAG, "clear queue data..");
267
268     // mutex lock
269     oc_mutex_lock(thread->threadMutex);
270
271     // remove all remained list data.
272     while (u_queue_get_size(thread->dataQueue) > 0)
273     {
274         // get data
275         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
276
277         // free
278         if (NULL != message)
279         {
280             if (NULL != thread->destroy)
281             {
282                 thread->destroy(message->msg, message->size);
283             }
284             else
285             {
286                 OICFree(message->msg);
287             }
288
289             OICFree(message);
290         }
291     }
292
293     // mutex unlock
294     oc_mutex_unlock(thread->threadMutex);
295
296     return CA_STATUS_OK;
297 }
298
299 CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
300                                             CAContextDataDestroy callback, void *ctx)
301 {
302     if (NULL == thread)
303     {
304         OIC_LOG(ERROR, TAG, "thread instance is empty..");
305         return CA_STATUS_INVALID_PARAM;
306     }
307
308     if (NULL == callback)
309     {
310         OIC_LOG(ERROR, TAG, "callback is NULL..");
311         return CA_STATUS_INVALID_PARAM;
312     }
313
314     if (NULL == ctx)
315     {
316         OIC_LOG(ERROR, TAG, "ctx is NULL..");
317         return CA_STATUS_INVALID_PARAM;
318     }
319
320     OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
321
322     // mutex lock
323     oc_mutex_lock(thread->threadMutex);
324
325     // remove adapter related list data.
326     u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
327
328     // mutex unlock
329     oc_mutex_unlock(thread->threadMutex);
330
331     return CA_STATUS_OK;
332 }
333
334 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
335 {
336     if (NULL == thread)
337     {
338         OIC_LOG(ERROR, TAG, "thread instance is empty..");
339         return CA_STATUS_INVALID_PARAM;
340     }
341
342     OIC_LOG(DEBUG, TAG, "thread destroy..");
343
344     // mutex lock
345     oc_mutex_lock(thread->threadMutex);
346
347     // remove all remained list data.
348     while (u_queue_get_size(thread->dataQueue) > 0)
349     {
350         // get data
351         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
352
353         // free
354         if (NULL != message)
355         {
356             if (NULL != thread->destroy)
357             {
358                 thread->destroy(message->msg, message->size);
359             }
360             else
361             {
362                 OICFree(message->msg);
363             }
364
365             OICFree(message);
366         }
367     }
368
369     // mutex unlock
370     oc_mutex_unlock(thread->threadMutex);
371
372     oc_mutex_free(thread->threadMutex);
373     thread->threadMutex = NULL;
374     oc_cond_free(thread->threadCond);
375     thread->threadCond = NULL;
376
377     u_queue_delete(thread->dataQueue);
378     thread->dataQueue = NULL;
379
380     return CA_STATUS_OK;
381 }
382
383 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
384 {
385     if (NULL == thread)
386     {
387         OIC_LOG(ERROR, TAG, "thread instance is empty..");
388         return CA_STATUS_INVALID_PARAM;
389     }
390
391     OIC_LOG(DEBUG, TAG, "thread stop request!!");
392
393     if (!thread->isStop)
394     {
395         // mutex lock
396         oc_mutex_lock(thread->threadMutex);
397
398         // set stop flag
399         thread->isStop = true;
400
401         // notify the thread
402         oc_cond_signal(thread->threadCond);
403
404         oc_cond_wait(thread->threadCond, thread->threadMutex);
405
406         // mutex unlock
407         oc_mutex_unlock(thread->threadMutex);
408     }
409
410     return CA_STATUS_OK;
411 }