7516a12ae70b5de2e6e32e65c177ff80a276b352
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26
27 #include "caqueueingthread.h"
28 #include "oic_malloc.h"
29 #include "logger.h"
30
31 #define TAG PCF("OIC_CA_QING")
32
33 static void CAQueueingThreadBaseRoutine(void *threadValue)
34 {
35     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
36
37     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
38
39     if (NULL == thread)
40     {
41         OIC_LOG(ERROR, TAG, "thread data passing error!!");
42         return;
43     }
44
45     while (!thread->isStop)
46     {
47         // mutex lock
48         ca_mutex_lock(thread->threadMutex);
49
50         // if queue is empty, thread will wait
51         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
52         {
53             OIC_LOG(DEBUG, TAG, "wait..");
54
55             // wait
56             ca_cond_wait(thread->threadCond, thread->threadMutex);
57
58             OIC_LOG(DEBUG, TAG, "wake up..");
59         }
60
61         // check stop flag
62         if (thread->isStop)
63         {
64             // mutex unlock
65             ca_mutex_unlock(thread->threadMutex);
66             continue;
67         }
68
69         // get data
70         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
71         // mutex unlock
72         ca_mutex_unlock(thread->threadMutex);
73         if (NULL == message)
74         {
75             continue;
76         }
77
78         // process data
79         thread->threadTask(message->msg);
80
81         // free
82         if (NULL != thread->destroy)
83         {
84             thread->destroy(message->msg, message->size);
85         }
86         else
87         {
88             OICFree(message->msg);
89         }
90
91         OICFree(message);
92     }
93
94     ca_mutex_lock(thread->threadMutex);
95     ca_cond_signal(thread->threadCond);
96     ca_mutex_unlock(thread->threadMutex);
97
98     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
99 }
100
101 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
102                                       CAThreadTask task, CADataDestroyFunction destroy)
103 {
104     if (NULL == thread)
105     {
106         OIC_LOG(ERROR, TAG, "thread instance is empty..");
107         return CA_STATUS_INVALID_PARAM;
108     }
109
110     if (NULL == handle)
111     {
112         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
113         return CA_STATUS_INVALID_PARAM;
114     }
115
116     OIC_LOG(DEBUG, TAG, "thread initialize..");
117
118     // set send thread data
119     thread->threadPool = handle;
120     thread->dataQueue = u_queue_create();
121     thread->threadMutex = ca_mutex_new();
122     thread->threadCond = ca_cond_new();
123     thread->isStop = true;
124     thread->threadTask = task;
125     thread->destroy = destroy;
126     if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
127     {
128         goto ERROR_MEM_FAILURE;
129     }
130
131     return CA_STATUS_OK;
132
133 ERROR_MEM_FAILURE:
134     if (thread->dataQueue)
135     {
136         u_queue_delete(thread->dataQueue);
137         thread->dataQueue = NULL;
138     }
139     if (thread->threadMutex)
140     {
141         ca_mutex_free(thread->threadMutex);
142         thread->threadMutex = NULL;
143     }
144     if (thread->threadCond)
145     {
146         ca_cond_free(thread->threadCond);
147         thread->threadCond = NULL;
148     }
149     return CA_MEMORY_ALLOC_FAILED;
150 }
151
152 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
153 {
154     if (NULL == thread)
155     {
156         OIC_LOG(ERROR, TAG, "thread instance is empty..");
157         return CA_STATUS_INVALID_PARAM;
158     }
159
160     if (NULL == thread->threadPool)
161     {
162         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
163         return CA_STATUS_INVALID_PARAM;
164     }
165
166     if (false == thread->isStop) //Queueing thread already running
167     {
168         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
169         return CA_STATUS_OK;
170     }
171
172     // mutex lock
173     ca_mutex_lock(thread->threadMutex);
174     thread->isStop = false;
175     // mutex unlock
176     ca_mutex_unlock(thread->threadMutex);
177
178     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
179                                             thread);
180     if (res != CA_STATUS_OK)
181     {
182         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
183     }
184
185     return res;
186 }
187
188 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
189 {
190     if (NULL == thread)
191     {
192         OIC_LOG(ERROR, TAG, "thread instance is empty..");
193         return CA_STATUS_INVALID_PARAM;
194     }
195
196     if (NULL == data || 0 == size)
197     {
198         OIC_LOG(ERROR, TAG, "data is empty..");
199
200         return CA_STATUS_INVALID_PARAM;
201     }
202
203     // create thread data
204     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
205
206     if (NULL == message)
207     {
208         OIC_LOG(ERROR, TAG, "memory error!!");
209         return CA_MEMORY_ALLOC_FAILED;
210     }
211
212     message->msg = data;
213     message->size = size;
214
215     // mutex lock
216     ca_mutex_lock(thread->threadMutex);
217
218     // add thread data into list
219     u_queue_add_element(thread->dataQueue, message);
220
221     // notity the thread
222     ca_cond_signal(thread->threadCond);
223
224     // mutex unlock
225     ca_mutex_unlock(thread->threadMutex);
226
227     return CA_STATUS_OK;
228 }
229
230 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
231 {
232     if (NULL == thread)
233     {
234         OIC_LOG(ERROR, TAG, "thread instance is empty..");
235         return CA_STATUS_INVALID_PARAM;
236     }
237
238     OIC_LOG(DEBUG, TAG, "thread destroy..");
239
240     ca_mutex_free(thread->threadMutex);
241     thread->threadMutex = NULL;
242     ca_cond_free(thread->threadCond);
243
244     // remove all remained list data.
245     while (u_queue_get_size(thread->dataQueue) > 0)
246     {
247         // get data
248         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
249
250         // free
251         if (NULL != message)
252         {
253             if (NULL != thread->destroy)
254             {
255                 thread->destroy(message->msg, message->size);
256             }
257             else
258             {
259                 OICFree(message->msg);
260             }
261
262             OICFree(message);
263         }
264     }
265
266     u_queue_delete(thread->dataQueue);
267
268     return CA_STATUS_OK;
269 }
270
271 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
272 {
273     if (NULL == thread)
274     {
275         OIC_LOG(ERROR, TAG, "thread instance is empty..");
276         return CA_STATUS_INVALID_PARAM;
277     }
278
279     OIC_LOG(DEBUG, TAG, "thread stop request!!");
280
281     if (!thread->isStop)
282     {
283         // mutex lock
284         ca_mutex_lock(thread->threadMutex);
285
286         // set stop flag
287         thread->isStop = true;
288
289         // notify the thread
290         ca_cond_signal(thread->threadCond);
291
292         ca_cond_wait(thread->threadCond, thread->threadMutex);
293
294         // mutex unlock
295         ca_mutex_unlock(thread->threadMutex);
296     }
297
298     return CA_STATUS_OK;
299 }