Merge "Merge branch 'master' into simulator" into simulator
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/syscall.h>
26 #include <sys/types.h>
27
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
30 #include "logger.h"
31
32 #define TAG PCF("CA_QING")
33
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 {
36     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
37
38     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
39
40     if (NULL == thread)
41     {
42         OIC_LOG(ERROR, TAG, "thread data passing error!!");
43
44         return;
45     }
46
47     while (!thread->isStop)
48     {
49         // mutex lock
50         ca_mutex_lock(thread->threadMutex);
51
52         // if queue is empty, thread will wait
53         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
54         {
55             OIC_LOG(DEBUG, TAG, "wait..");
56
57             // wait
58             ca_cond_wait(thread->threadCond, thread->threadMutex);
59
60             OIC_LOG(DEBUG, TAG, "wake up..");
61         }
62
63
64
65         // check stop flag
66         if (thread->isStop)
67         {
68             // mutex unlock
69             ca_mutex_unlock(thread->threadMutex);
70             continue;
71         }
72
73         // get data
74         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
75         // mutex unlock
76         ca_mutex_unlock(thread->threadMutex);
77         if (NULL == message)
78         {
79             continue;
80         }
81
82         // process data
83         thread->threadTask(message->msg);
84
85         // free
86         if (NULL != thread->destroy)
87         {
88             thread->destroy(message->msg, message->size);
89         }
90         else
91         {
92             OICFree(message->msg);
93         }
94
95         OICFree(message);
96     }
97
98     // remove all remained list data.
99     while (u_queue_get_size(thread->dataQueue) > 0)
100     {
101         // get data
102         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
103
104         // free
105         if(NULL != message)
106         {
107             if (NULL != thread->destroy)
108             {
109                 thread->destroy(message->msg, message->size);
110             }
111             else
112             {
113                 OICFree(message->msg);
114             }
115
116             OICFree(message);
117         }
118     }
119
120     ca_mutex_lock(thread->threadMutex);
121     ca_cond_signal(thread->threadCond);
122     ca_mutex_unlock(thread->threadMutex);
123
124     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
125 }
126
127 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
128                                       CAThreadTask task, CADataDestroyFunction destroy)
129 {
130     if (NULL == thread)
131     {
132         OIC_LOG(ERROR, TAG, "thread instance is empty..");
133         return CA_STATUS_INVALID_PARAM;
134     }
135
136     if (NULL == handle)
137     {
138         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
139         return CA_STATUS_INVALID_PARAM;
140     }
141
142     OIC_LOG(DEBUG, TAG, "thread initialize..");
143
144     // set send thread data
145     thread->threadPool = handle;
146     thread->dataQueue = u_queue_create();
147     thread->threadMutex = ca_mutex_new();
148     thread->threadCond = ca_cond_new();
149     thread->isStop = true;
150     thread->threadTask = task;
151     thread->destroy = destroy;
152     if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
153         goto ERROR_MEM_FAILURE;
154
155     return CA_STATUS_OK;
156     ERROR_MEM_FAILURE:
157     if(thread->dataQueue)
158     {
159         u_queue_delete(thread->dataQueue);
160         thread->dataQueue = NULL;
161     }
162     if(thread->threadMutex)
163     {
164         ca_mutex_free(thread->threadMutex);
165         thread->threadMutex = NULL;
166     }
167     if(thread->threadCond)
168     {
169         ca_cond_free(thread->threadCond);
170         thread->threadCond = NULL;
171     }
172     return CA_MEMORY_ALLOC_FAILED;
173
174 }
175
176 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
177 {
178     if (NULL == thread)
179     {
180         OIC_LOG(ERROR, TAG, "thread instance is empty..");
181         return CA_STATUS_INVALID_PARAM;
182     }
183
184     if (NULL == thread->threadPool)
185     {
186         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
187         return CA_STATUS_INVALID_PARAM;
188     }
189
190     if (false == thread->isStop) //Queueing thread already running
191     {
192         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
193         return CA_STATUS_OK;
194     }
195
196     // mutex lock
197     ca_mutex_lock(thread->threadMutex);
198     thread->isStop = false;
199     // mutex unlock
200     ca_mutex_unlock(thread->threadMutex);
201
202     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
203                                             thread);
204     if (res != CA_STATUS_OK)
205     {
206         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
207     }
208
209     return res;
210 }
211
212 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
213 {
214     if (NULL == thread)
215     {
216         OIC_LOG(ERROR, TAG, "thread instance is empty..");
217         return CA_STATUS_INVALID_PARAM;
218     }
219
220     if (NULL == data || 0 == size)
221     {
222         OIC_LOG(ERROR, TAG, "data is empty..");
223
224         return CA_STATUS_INVALID_PARAM;
225     }
226
227     // create thread data
228     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
229
230     if (NULL == message)
231     {
232         OIC_LOG(ERROR, TAG, "memory error!!");
233         return CA_MEMORY_ALLOC_FAILED;
234     }
235
236     message->msg = data;
237     message->size = size;
238
239     // mutex lock
240     ca_mutex_lock(thread->threadMutex);
241
242     // add thread data into list
243     u_queue_add_element(thread->dataQueue, message);
244
245     // notity the thread
246     ca_cond_signal(thread->threadCond);
247
248     // mutex unlock
249     ca_mutex_unlock(thread->threadMutex);
250
251     return CA_STATUS_OK;
252 }
253
254 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
255 {
256     if (NULL == thread)
257     {
258         OIC_LOG(ERROR, TAG, "thread instance is empty..");
259         return CA_STATUS_INVALID_PARAM;
260     }
261
262     OIC_LOG(DEBUG, TAG, "thread destroy..");
263
264     ca_mutex_free(thread->threadMutex);
265     thread->threadMutex = NULL;
266     ca_cond_free(thread->threadCond);
267     u_queue_delete(thread->dataQueue);
268
269     return CA_STATUS_OK;
270 }
271
272 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
273 {
274     if (NULL == thread)
275     {
276         OIC_LOG(ERROR, TAG, "thread instance is empty..");
277         return CA_STATUS_INVALID_PARAM;
278     }
279
280     OIC_LOG(DEBUG, TAG, "thread stop request!!");
281
282     if (!thread->isStop)
283     {
284         // mutex lock
285         ca_mutex_lock(thread->threadMutex);
286
287         // set stop flag
288         thread->isStop = true;
289
290         // notify the thread
291         ca_cond_signal(thread->threadCond);
292
293         ca_cond_wait(thread->threadCond, thread->threadMutex);
294
295         // mutex unlock
296         ca_mutex_unlock(thread->threadMutex);
297     }
298
299     return CA_STATUS_OK;
300 }