0afeed9f1187a5b6e0bdb16d55e985fe7685988d
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #ifdef __TIZENRT__
22 #include <tinyara/config.h>
23 #endif
24
25 #include "iotivity_config.h"
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
34 #endif
35
36 #include "caqueueingthread.h"
37 #include "camessagehandler.h"
38 #include "oic_malloc.h"
39 #include "logger.h"
40
41 #define TAG PCF("OIC_CA_QING")
42
43 static void CAQueueingThreadBaseRoutine(void *threadValue)
44 {
45     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
46
47     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
48
49     if (NULL == thread)
50     {
51         OIC_LOG(ERROR, TAG, "thread data passing error!!");
52         return;
53     }
54
55     while (!thread->isStop)
56     {
57         // mutex lock
58         oc_mutex_lock(thread->threadMutex);
59
60         // if queue is empty, thread will wait
61         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
62         {
63             OIC_LOG(DEBUG, TAG, "wait..");
64
65             // wait
66             oc_cond_wait(thread->threadCond, thread->threadMutex);
67
68             OIC_LOG(DEBUG, TAG, "wake up..");
69         }
70
71         // check stop flag
72         if (thread->isStop)
73         {
74             // mutex unlock
75             oc_mutex_unlock(thread->threadMutex);
76             continue;
77         }
78
79         // get data
80         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
81         // mutex unlock
82         oc_mutex_unlock(thread->threadMutex);
83         if (NULL == message)
84         {
85             continue;
86         }
87
88         // process data
89         thread->threadTask(message->msg);
90
91         // free
92         if (NULL != thread->destroy)
93         {
94             thread->destroy(message->msg, message->size);
95         }
96         else
97         {
98             OICFree(message->msg);
99         }
100
101         OICFree(message);
102     }
103
104     oc_mutex_lock(thread->threadMutex);
105     oc_cond_signal(thread->threadCond);
106     oc_mutex_unlock(thread->threadMutex);
107
108     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
109 }
110
111 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
112                                       CAThreadTask task, CADataDestroyFunction destroy)
113 {
114     if (NULL == thread)
115     {
116         OIC_LOG(ERROR, TAG, "thread instance is empty..");
117         return CA_STATUS_INVALID_PARAM;
118     }
119
120     if (NULL == handle)
121     {
122         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
123         return CA_STATUS_INVALID_PARAM;
124     }
125
126     OIC_LOG(DEBUG, TAG, "thread initialize..");
127
128     // set send thread data
129     thread->threadPool = handle;
130     thread->dataQueue = u_queue_create();
131     thread->threadMutex = oc_mutex_new();
132     thread->threadCond = oc_cond_new();
133     thread->isStop = true;
134     thread->threadTask = task;
135     thread->destroy = destroy;
136     if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
137     {
138         goto ERROR_MEM_FAILURE;
139     }
140
141     return CA_STATUS_OK;
142
143 ERROR_MEM_FAILURE:
144     if (thread->dataQueue)
145     {
146         u_queue_delete(thread->dataQueue);
147         thread->dataQueue = NULL;
148     }
149     if (thread->threadMutex)
150     {
151         oc_mutex_free(thread->threadMutex);
152         thread->threadMutex = NULL;
153     }
154     if (thread->threadCond)
155     {
156         oc_cond_free(thread->threadCond);
157         thread->threadCond = NULL;
158     }
159     return CA_MEMORY_ALLOC_FAILED;
160 }
161 #ifndef __TIZENRT__
162 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
163 #else
164 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
165 #endif
166 {
167     if (NULL == thread)
168     {
169         OIC_LOG(ERROR, TAG, "thread instance is empty..");
170         return CA_STATUS_INVALID_PARAM;
171     }
172
173     if (NULL == thread->threadPool)
174     {
175         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
176         return CA_STATUS_INVALID_PARAM;
177     }
178
179     if (false == thread->isStop) //Queueing thread already running
180     {
181         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
182         return CA_STATUS_OK;
183     }
184
185     // mutex lock
186     oc_mutex_lock(thread->threadMutex);
187     thread->isStop = false;
188     // mutex unlock
189     oc_mutex_unlock(thread->threadMutex);
190 #ifndef __TIZENRT__
191     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
192                                              thread, NULL);
193 #else
194     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
195                                              thread, NULL, thread_name,
196                                              CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
197 #endif
198     if (res != CA_STATUS_OK)
199     {
200         // update thread status.
201         oc_mutex_lock(thread->threadMutex);
202         thread->isStop = true;
203         oc_mutex_unlock(thread->threadMutex);
204
205         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
206     }
207
208     return res;
209 }
210
211 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
212 {
213     if (NULL == thread)
214     {
215         OIC_LOG(ERROR, TAG, "thread instance is empty..");
216         return CA_STATUS_INVALID_PARAM;
217     }
218
219     if (NULL == data || 0 == size)
220     {
221         OIC_LOG(ERROR, TAG, "data is empty..");
222         return CA_STATUS_INVALID_PARAM;
223     }
224
225     // create thread data
226     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
227
228     if (NULL == message)
229     {
230         OIC_LOG(ERROR, TAG, "memory error!!");
231         return CA_MEMORY_ALLOC_FAILED;
232     }
233
234     message->msg = data;
235     message->size = size;
236
237     // mutex lock
238     oc_mutex_lock(thread->threadMutex);
239
240     // add thread data into list
241     u_queue_add_element(thread->dataQueue, message);
242
243     // notity the thread
244     oc_cond_signal(thread->threadCond);
245
246     // mutex unlock
247     oc_mutex_unlock(thread->threadMutex);
248
249     return CA_STATUS_OK;
250 }
251
252 CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
253 {
254     if (NULL == thread)
255     {
256         OIC_LOG(ERROR, TAG, "thread instance is empty..");
257         return CA_STATUS_INVALID_PARAM;
258     }
259
260     OIC_LOG(DEBUG, TAG, "clear queue data..");
261
262     // mutex lock
263     oc_mutex_lock(thread->threadMutex);
264
265     // remove all remained list data.
266     while (u_queue_get_size(thread->dataQueue) > 0)
267     {
268         // get data
269         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
270
271         // free
272         if (NULL != message)
273         {
274             if (NULL != thread->destroy)
275             {
276                 thread->destroy(message->msg, message->size);
277             }
278             else
279             {
280                 OICFree(message->msg);
281             }
282
283             OICFree(message);
284         }
285     }
286
287     // mutex unlock
288     oc_mutex_unlock(thread->threadMutex);
289
290     return CA_STATUS_OK;
291 }
292
293 CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
294                                             CAContextDataDestroy callback, void *ctx)
295 {
296     if (NULL == thread)
297     {
298         OIC_LOG(ERROR, TAG, "thread instance is empty..");
299         return CA_STATUS_INVALID_PARAM;
300     }
301
302     if (NULL == callback)
303     {
304         OIC_LOG(ERROR, TAG, "callback is NULL..");
305         return CA_STATUS_INVALID_PARAM;
306     }
307
308     if (NULL == ctx)
309     {
310         OIC_LOG(ERROR, TAG, "ctx is NULL..");
311         return CA_STATUS_INVALID_PARAM;
312     }
313
314     OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
315
316     // mutex lock
317     oc_mutex_lock(thread->threadMutex);
318
319     // remove adapter related list data.
320     u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
321
322     // mutex unlock
323     oc_mutex_unlock(thread->threadMutex);
324
325     return CA_STATUS_OK;
326 }
327
328 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
329 {
330     if (NULL == thread)
331     {
332         OIC_LOG(ERROR, TAG, "thread instance is empty..");
333         return CA_STATUS_INVALID_PARAM;
334     }
335
336     OIC_LOG(DEBUG, TAG, "thread destroy..");
337
338     // mutex lock
339     oc_mutex_lock(thread->threadMutex);
340
341     // remove all remained list data.
342     while (u_queue_get_size(thread->dataQueue) > 0)
343     {
344         // get data
345         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
346
347         // free
348         if (NULL != message)
349         {
350             if (NULL != thread->destroy)
351             {
352                 thread->destroy(message->msg, message->size);
353             }
354             else
355             {
356                 OICFree(message->msg);
357             }
358
359             OICFree(message);
360         }
361     }
362
363     // mutex unlock
364     oc_mutex_unlock(thread->threadMutex);
365
366     oc_mutex_free(thread->threadMutex);
367     thread->threadMutex = NULL;
368     oc_cond_free(thread->threadCond);
369     thread->threadCond = NULL;
370
371     u_queue_delete(thread->dataQueue);
372     thread->dataQueue = NULL;
373
374     return CA_STATUS_OK;
375 }
376
377 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
378 {
379     if (NULL == thread)
380     {
381         OIC_LOG(ERROR, TAG, "thread instance is empty..");
382         return CA_STATUS_INVALID_PARAM;
383     }
384
385     OIC_LOG(DEBUG, TAG, "thread stop request!!");
386
387     if (!thread->isStop)
388     {
389         // mutex lock
390         oc_mutex_lock(thread->threadMutex);
391
392         // set stop flag
393         thread->isStop = true;
394
395         // notify the thread
396         oc_cond_signal(thread->threadCond);
397
398         oc_cond_wait(thread->threadCond, thread->threadMutex);
399
400         // mutex unlock
401         oc_mutex_unlock(thread->threadMutex);
402     }
403
404     return CA_STATUS_OK;
405 }