Merge branch '1.1-rel'
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26
27 #include "caqueueingthread.h"
28 #include "oic_malloc.h"
29 #include "logger.h"
30
31 #define TAG PCF("OIC_CA_QING")
32
33 static void CAQueueingThreadBaseRoutine(void *threadValue)
34 {
35     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
36
37     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
38
39     if (NULL == thread)
40     {
41         OIC_LOG(ERROR, TAG, "thread data passing error!!");
42
43         return;
44     }
45
46     while (!thread->isStop)
47     {
48         // mutex lock
49         ca_mutex_lock(thread->threadMutex);
50
51         // if queue is empty, thread will wait
52         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
53         {
54             OIC_LOG(DEBUG, TAG, "wait..");
55
56             // wait
57             ca_cond_wait(thread->threadCond, thread->threadMutex);
58
59             OIC_LOG(DEBUG, TAG, "wake up..");
60         }
61
62         // check stop flag
63         if (thread->isStop)
64         {
65             // mutex unlock
66             ca_mutex_unlock(thread->threadMutex);
67             continue;
68         }
69
70         // get data
71         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
72         // mutex unlock
73         ca_mutex_unlock(thread->threadMutex);
74         if (NULL == message)
75         {
76             continue;
77         }
78
79         // process data
80         thread->threadTask(message->msg);
81
82         // free
83         if (NULL != thread->destroy)
84         {
85             thread->destroy(message->msg, message->size);
86         }
87         else
88         {
89             OICFree(message->msg);
90         }
91
92         OICFree(message);
93     }
94
95     ca_mutex_lock(thread->threadMutex);
96     ca_cond_signal(thread->threadCond);
97     ca_mutex_unlock(thread->threadMutex);
98
99     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
100 }
101
102 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
103                                       CAThreadTask task, CADataDestroyFunction destroy)
104 {
105     if (NULL == thread)
106     {
107         OIC_LOG(ERROR, TAG, "thread instance is empty..");
108         return CA_STATUS_INVALID_PARAM;
109     }
110
111     if (NULL == handle)
112     {
113         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
114         return CA_STATUS_INVALID_PARAM;
115     }
116
117     OIC_LOG(DEBUG, TAG, "thread initialize..");
118
119     // set send thread data
120     thread->threadPool = handle;
121     thread->dataQueue = u_queue_create();
122     thread->threadMutex = ca_mutex_new();
123     thread->threadCond = ca_cond_new();
124     thread->isStop = true;
125     thread->threadTask = task;
126     thread->destroy = destroy;
127     if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
128         goto ERROR_MEM_FAILURE;
129
130     return CA_STATUS_OK;
131     ERROR_MEM_FAILURE:
132     if(thread->dataQueue)
133     {
134         u_queue_delete(thread->dataQueue);
135         thread->dataQueue = NULL;
136     }
137     if(thread->threadMutex)
138     {
139         ca_mutex_free(thread->threadMutex);
140         thread->threadMutex = NULL;
141     }
142     if(thread->threadCond)
143     {
144         ca_cond_free(thread->threadCond);
145         thread->threadCond = NULL;
146     }
147     return CA_MEMORY_ALLOC_FAILED;
148
149 }
150
151 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
152 {
153     if (NULL == thread)
154     {
155         OIC_LOG(ERROR, TAG, "thread instance is empty..");
156         return CA_STATUS_INVALID_PARAM;
157     }
158
159     if (NULL == thread->threadPool)
160     {
161         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
162         return CA_STATUS_INVALID_PARAM;
163     }
164
165     if (false == thread->isStop) //Queueing thread already running
166     {
167         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
168         return CA_STATUS_OK;
169     }
170
171     // mutex lock
172     ca_mutex_lock(thread->threadMutex);
173     thread->isStop = false;
174     // mutex unlock
175     ca_mutex_unlock(thread->threadMutex);
176
177     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
178                                             thread);
179     if (res != CA_STATUS_OK)
180     {
181         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
182     }
183
184     return res;
185 }
186
187 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
188 {
189     if (NULL == thread)
190     {
191         OIC_LOG(ERROR, TAG, "thread instance is empty..");
192         return CA_STATUS_INVALID_PARAM;
193     }
194
195     if (NULL == data || 0 == size)
196     {
197         OIC_LOG(ERROR, TAG, "data is empty..");
198
199         return CA_STATUS_INVALID_PARAM;
200     }
201
202     // create thread data
203     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
204
205     if (NULL == message)
206     {
207         OIC_LOG(ERROR, TAG, "memory error!!");
208         return CA_MEMORY_ALLOC_FAILED;
209     }
210
211     message->msg = data;
212     message->size = size;
213
214     // mutex lock
215     ca_mutex_lock(thread->threadMutex);
216
217     // add thread data into list
218     u_queue_add_element(thread->dataQueue, message);
219
220     // notity the thread
221     ca_cond_signal(thread->threadCond);
222
223     // mutex unlock
224     ca_mutex_unlock(thread->threadMutex);
225
226     return CA_STATUS_OK;
227 }
228
229 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
230 {
231     if (NULL == thread)
232     {
233         OIC_LOG(ERROR, TAG, "thread instance is empty..");
234         return CA_STATUS_INVALID_PARAM;
235     }
236
237     OIC_LOG(DEBUG, TAG, "thread destroy..");
238
239     ca_mutex_free(thread->threadMutex);
240     thread->threadMutex = NULL;
241     ca_cond_free(thread->threadCond);
242
243     // remove all remained list data.
244     while (u_queue_get_size(thread->dataQueue) > 0)
245     {
246         // get data
247         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
248
249         // free
250         if(NULL != message)
251         {
252             if (NULL != thread->destroy)
253             {
254                 thread->destroy(message->msg, message->size);
255             }
256             else
257             {
258                 OICFree(message->msg);
259             }
260
261             OICFree(message);
262         }
263     }
264
265     u_queue_delete(thread->dataQueue);
266
267     return CA_STATUS_OK;
268 }
269
270 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
271 {
272     if (NULL == thread)
273     {
274         OIC_LOG(ERROR, TAG, "thread instance is empty..");
275         return CA_STATUS_INVALID_PARAM;
276     }
277
278     OIC_LOG(DEBUG, TAG, "thread stop request!!");
279
280     if (!thread->isStop)
281     {
282         // mutex lock
283         ca_mutex_lock(thread->threadMutex);
284
285         // set stop flag
286         thread->isStop = true;
287
288         // notify the thread
289         ca_cond_signal(thread->threadCond);
290
291         ca_cond_wait(thread->threadCond, thread->threadMutex);
292
293         // mutex unlock
294         ca_mutex_unlock(thread->threadMutex);
295     }
296
297     return CA_STATUS_OK;
298 }