Merge branch 'master' into easysetup
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26
27 #include "caqueueingthread.h"
28 #include "oic_malloc.h"
29 #include "logger.h"
30
31 #define TAG PCF("CA_QING")
32
33 static void CAQueueingThreadBaseRoutine(void *threadValue)
34 {
35     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
36
37     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
38
39     if (NULL == thread)
40     {
41         OIC_LOG(ERROR, TAG, "thread data passing error!!");
42
43         return;
44     }
45
46     while (!thread->isStop)
47     {
48         // mutex lock
49         ca_mutex_lock(thread->threadMutex);
50
51         // if queue is empty, thread will wait
52         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
53         {
54             OIC_LOG(DEBUG, TAG, "wait..");
55
56             // wait
57             ca_cond_wait(thread->threadCond, thread->threadMutex);
58
59             OIC_LOG(DEBUG, TAG, "wake up..");
60         }
61
62
63
64         // check stop flag
65         if (thread->isStop)
66         {
67             // mutex unlock
68             ca_mutex_unlock(thread->threadMutex);
69             continue;
70         }
71
72         // get data
73         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
74         // mutex unlock
75         ca_mutex_unlock(thread->threadMutex);
76         if (NULL == message)
77         {
78             continue;
79         }
80
81         // process data
82         thread->threadTask(message->msg);
83
84         // free
85         if (NULL != thread->destroy)
86         {
87             thread->destroy(message->msg, message->size);
88         }
89         else
90         {
91             OICFree(message->msg);
92         }
93
94         OICFree(message);
95     }
96
97     // remove all remained list data.
98     while (u_queue_get_size(thread->dataQueue) > 0)
99     {
100         // get data
101         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
102
103         // free
104         if(NULL != message)
105         {
106             if (NULL != thread->destroy)
107             {
108                 thread->destroy(message->msg, message->size);
109             }
110             else
111             {
112                 OICFree(message->msg);
113             }
114
115             OICFree(message);
116         }
117     }
118
119     ca_mutex_lock(thread->threadMutex);
120     ca_cond_signal(thread->threadCond);
121     ca_mutex_unlock(thread->threadMutex);
122
123     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
124 }
125
126 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
127                                       CAThreadTask task, CADataDestroyFunction destroy)
128 {
129     if (NULL == thread)
130     {
131         OIC_LOG(ERROR, TAG, "thread instance is empty..");
132         return CA_STATUS_INVALID_PARAM;
133     }
134
135     if (NULL == handle)
136     {
137         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
138         return CA_STATUS_INVALID_PARAM;
139     }
140
141     OIC_LOG(DEBUG, TAG, "thread initialize..");
142
143     // set send thread data
144     thread->threadPool = handle;
145     thread->dataQueue = u_queue_create();
146     thread->threadMutex = ca_mutex_new();
147     thread->threadCond = ca_cond_new();
148     thread->isStop = true;
149     thread->threadTask = task;
150     thread->destroy = destroy;
151     if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
152         goto ERROR_MEM_FAILURE;
153
154     return CA_STATUS_OK;
155     ERROR_MEM_FAILURE:
156     if(thread->dataQueue)
157     {
158         u_queue_delete(thread->dataQueue);
159         thread->dataQueue = NULL;
160     }
161     if(thread->threadMutex)
162     {
163         ca_mutex_free(thread->threadMutex);
164         thread->threadMutex = NULL;
165     }
166     if(thread->threadCond)
167     {
168         ca_cond_free(thread->threadCond);
169         thread->threadCond = NULL;
170     }
171     return CA_MEMORY_ALLOC_FAILED;
172
173 }
174
175 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
176 {
177     if (NULL == thread)
178     {
179         OIC_LOG(ERROR, TAG, "thread instance is empty..");
180         return CA_STATUS_INVALID_PARAM;
181     }
182
183     if (NULL == thread->threadPool)
184     {
185         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
186         return CA_STATUS_INVALID_PARAM;
187     }
188
189     if (false == thread->isStop) //Queueing thread already running
190     {
191         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
192         return CA_STATUS_OK;
193     }
194
195     // mutex lock
196     ca_mutex_lock(thread->threadMutex);
197     thread->isStop = false;
198     // mutex unlock
199     ca_mutex_unlock(thread->threadMutex);
200
201     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
202                                             thread);
203     if (res != CA_STATUS_OK)
204     {
205         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
206     }
207
208     return res;
209 }
210
211 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
212 {
213     if (NULL == thread)
214     {
215         OIC_LOG(ERROR, TAG, "thread instance is empty..");
216         return CA_STATUS_INVALID_PARAM;
217     }
218
219     if (NULL == data || 0 == size)
220     {
221         OIC_LOG(ERROR, TAG, "data is empty..");
222
223         return CA_STATUS_INVALID_PARAM;
224     }
225
226     // create thread data
227     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
228
229     if (NULL == message)
230     {
231         OIC_LOG(ERROR, TAG, "memory error!!");
232         return CA_MEMORY_ALLOC_FAILED;
233     }
234
235     message->msg = data;
236     message->size = size;
237
238     // mutex lock
239     ca_mutex_lock(thread->threadMutex);
240
241     // add thread data into list
242     u_queue_add_element(thread->dataQueue, message);
243
244     // notity the thread
245     ca_cond_signal(thread->threadCond);
246
247     // mutex unlock
248     ca_mutex_unlock(thread->threadMutex);
249
250     return CA_STATUS_OK;
251 }
252
253 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
254 {
255     if (NULL == thread)
256     {
257         OIC_LOG(ERROR, TAG, "thread instance is empty..");
258         return CA_STATUS_INVALID_PARAM;
259     }
260
261     OIC_LOG(DEBUG, TAG, "thread destroy..");
262
263     ca_mutex_free(thread->threadMutex);
264     thread->threadMutex = NULL;
265     ca_cond_free(thread->threadCond);
266     u_queue_delete(thread->dataQueue);
267
268     return CA_STATUS_OK;
269 }
270
271 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
272 {
273     if (NULL == thread)
274     {
275         OIC_LOG(ERROR, TAG, "thread instance is empty..");
276         return CA_STATUS_INVALID_PARAM;
277     }
278
279     OIC_LOG(DEBUG, TAG, "thread stop request!!");
280
281     if (!thread->isStop)
282     {
283         // mutex lock
284         ca_mutex_lock(thread->threadMutex);
285
286         // set stop flag
287         thread->isStop = true;
288
289         // notify the thread
290         ca_cond_signal(thread->threadCond);
291
292         ca_cond_wait(thread->threadCond, thread->threadMutex);
293
294         // mutex unlock
295         ca_mutex_unlock(thread->threadMutex);
296     }
297
298     return CA_STATUS_OK;
299 }