Merge branch 'master' into windows-port
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #ifdef HAVE_UNISTD_H
25 #include <unistd.h>
26 #endif
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30
31 #include "caqueueingthread.h"
32 #include "oic_malloc.h"
33 #include "logger.h"
34
35 #define TAG PCF("OIC_CA_QING")
36
37 static void CAQueueingThreadBaseRoutine(void *threadValue)
38 {
39     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
40
41     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
42
43     if (NULL == thread)
44     {
45         OIC_LOG(ERROR, TAG, "thread data passing error!!");
46
47         return;
48     }
49
50     while (!thread->isStop)
51     {
52         // mutex lock
53         ca_mutex_lock(thread->threadMutex);
54
55         // if queue is empty, thread will wait
56         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
57         {
58             OIC_LOG(DEBUG, TAG, "wait..");
59
60             // wait
61             ca_cond_wait(thread->threadCond, thread->threadMutex);
62
63             OIC_LOG(DEBUG, TAG, "wake up..");
64         }
65
66         // check stop flag
67         if (thread->isStop)
68         {
69             // mutex unlock
70             ca_mutex_unlock(thread->threadMutex);
71             continue;
72         }
73
74         // get data
75         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
76         // mutex unlock
77         ca_mutex_unlock(thread->threadMutex);
78         if (NULL == message)
79         {
80             continue;
81         }
82
83         // process data
84         thread->threadTask(message->msg);
85
86         // free
87         if (NULL != thread->destroy)
88         {
89             thread->destroy(message->msg, message->size);
90         }
91         else
92         {
93             OICFree(message->msg);
94         }
95
96         OICFree(message);
97     }
98
99     ca_mutex_lock(thread->threadMutex);
100     ca_cond_signal(thread->threadCond);
101     ca_mutex_unlock(thread->threadMutex);
102
103     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
104 }
105
106 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
107                                       CAThreadTask task, CADataDestroyFunction destroy)
108 {
109     if (NULL == thread)
110     {
111         OIC_LOG(ERROR, TAG, "thread instance is empty..");
112         return CA_STATUS_INVALID_PARAM;
113     }
114
115     if (NULL == handle)
116     {
117         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
118         return CA_STATUS_INVALID_PARAM;
119     }
120
121     OIC_LOG(DEBUG, TAG, "thread initialize..");
122
123     // set send thread data
124     thread->threadPool = handle;
125     thread->dataQueue = u_queue_create();
126     thread->threadMutex = ca_mutex_new();
127     thread->threadCond = ca_cond_new();
128     thread->isStop = true;
129     thread->threadTask = task;
130     thread->destroy = destroy;
131     if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
132         goto ERROR_MEM_FAILURE;
133
134     return CA_STATUS_OK;
135     ERROR_MEM_FAILURE:
136     if(thread->dataQueue)
137     {
138         u_queue_delete(thread->dataQueue);
139         thread->dataQueue = NULL;
140     }
141     if(thread->threadMutex)
142     {
143         ca_mutex_free(thread->threadMutex);
144         thread->threadMutex = NULL;
145     }
146     if(thread->threadCond)
147     {
148         ca_cond_free(thread->threadCond);
149         thread->threadCond = NULL;
150     }
151     return CA_MEMORY_ALLOC_FAILED;
152
153 }
154
155 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
156 {
157     if (NULL == thread)
158     {
159         OIC_LOG(ERROR, TAG, "thread instance is empty..");
160         return CA_STATUS_INVALID_PARAM;
161     }
162
163     if (NULL == thread->threadPool)
164     {
165         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
166         return CA_STATUS_INVALID_PARAM;
167     }
168
169     if (false == thread->isStop) //Queueing thread already running
170     {
171         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
172         return CA_STATUS_OK;
173     }
174
175     // mutex lock
176     ca_mutex_lock(thread->threadMutex);
177     thread->isStop = false;
178     // mutex unlock
179     ca_mutex_unlock(thread->threadMutex);
180
181     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
182                                             thread);
183     if (res != CA_STATUS_OK)
184     {
185         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
186     }
187
188     return res;
189 }
190
191 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
192 {
193     if (NULL == thread)
194     {
195         OIC_LOG(ERROR, TAG, "thread instance is empty..");
196         return CA_STATUS_INVALID_PARAM;
197     }
198
199     if (NULL == data || 0 == size)
200     {
201         OIC_LOG(ERROR, TAG, "data is empty..");
202
203         return CA_STATUS_INVALID_PARAM;
204     }
205
206     // create thread data
207     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
208
209     if (NULL == message)
210     {
211         OIC_LOG(ERROR, TAG, "memory error!!");
212         return CA_MEMORY_ALLOC_FAILED;
213     }
214
215     message->msg = data;
216     message->size = size;
217
218     // mutex lock
219     ca_mutex_lock(thread->threadMutex);
220
221     // add thread data into list
222     u_queue_add_element(thread->dataQueue, message);
223
224     // notity the thread
225     ca_cond_signal(thread->threadCond);
226
227     // mutex unlock
228     ca_mutex_unlock(thread->threadMutex);
229
230     return CA_STATUS_OK;
231 }
232
233 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
234 {
235     if (NULL == thread)
236     {
237         OIC_LOG(ERROR, TAG, "thread instance is empty..");
238         return CA_STATUS_INVALID_PARAM;
239     }
240
241     OIC_LOG(DEBUG, TAG, "thread destroy..");
242
243     ca_mutex_free(thread->threadMutex);
244     thread->threadMutex = NULL;
245     ca_cond_free(thread->threadCond);
246
247     // remove all remained list data.
248     while (u_queue_get_size(thread->dataQueue) > 0)
249     {
250         // get data
251         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
252
253         // free
254         if(NULL != message)
255         {
256             if (NULL != thread->destroy)
257             {
258                 thread->destroy(message->msg, message->size);
259             }
260             else
261             {
262                 OICFree(message->msg);
263             }
264
265             OICFree(message);
266         }
267     }
268
269     u_queue_delete(thread->dataQueue);
270
271     return CA_STATUS_OK;
272 }
273
274 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
275 {
276     if (NULL == thread)
277     {
278         OIC_LOG(ERROR, TAG, "thread instance is empty..");
279         return CA_STATUS_INVALID_PARAM;
280     }
281
282     OIC_LOG(DEBUG, TAG, "thread stop request!!");
283
284     if (!thread->isStop)
285     {
286         // mutex lock
287         ca_mutex_lock(thread->threadMutex);
288
289         // set stop flag
290         thread->isStop = true;
291
292         // notify the thread
293         ca_cond_signal(thread->threadCond);
294
295         ca_cond_wait(thread->threadCond, thread->threadMutex);
296
297         // mutex unlock
298         ca_mutex_unlock(thread->threadMutex);
299     }
300
301     return CA_STATUS_OK;
302 }