Imported Upstream version 0.9.2
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/syscall.h>
26 #include <sys/types.h>
27
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
30 #include "logger.h"
31
32 #define TAG PCF("CA_QING")
33
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 {
36     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
37
38     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
39
40     if (NULL == thread)
41     {
42         OIC_LOG(ERROR, TAG, "thread data passing error!!");
43
44         return;
45     }
46
47     while (!thread->isStop)
48     {
49         // mutex lock
50         ca_mutex_lock(thread->threadMutex);
51
52         // if queue is empty, thread will wait
53         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
54         {
55             OIC_LOG(DEBUG, TAG, "wait..");
56
57             // wait
58             ca_cond_wait(thread->threadCond, thread->threadMutex);
59
60             OIC_LOG(DEBUG, TAG, "wake up..");
61         }
62
63         // mutex unlock
64         ca_mutex_unlock(thread->threadMutex);
65
66         // check stop flag
67         if (thread->isStop)
68         {
69             continue;
70         }
71
72         // get data
73         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
74         if (NULL == message)
75         {
76             continue;
77         }
78
79         // process data
80         thread->threadTask(message->msg);
81
82         // free
83         if (NULL != thread->destroy)
84         {
85             thread->destroy(message->msg, message->size);
86         }
87         else
88         {
89             OICFree(message->msg);
90         }
91
92         OICFree(message);
93     }
94
95     // remove all remained list data.
96     while (u_queue_get_size(thread->dataQueue) > 0)
97     {
98         // get data
99         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
100
101         // free
102         if(NULL != message)
103         {
104             if (NULL != thread->destroy)
105             {
106                 thread->destroy(message->msg, message->size);
107             }
108             else
109             {
110                 OICFree(message->msg);
111             }
112
113             OICFree(message);
114         }
115     }
116
117     ca_mutex_lock(thread->threadMutex);
118     ca_cond_signal(thread->threadCond);
119     ca_mutex_unlock(thread->threadMutex);
120
121     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
122 }
123
124 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
125                                       CAThreadTask task, CADataDestroyFunction destroy)
126 {
127     if (NULL == thread)
128     {
129         OIC_LOG(ERROR, TAG, "thread instance is empty..");
130         return CA_STATUS_INVALID_PARAM;
131     }
132
133     if (NULL == handle)
134     {
135         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
136         return CA_STATUS_INVALID_PARAM;
137     }
138
139     OIC_LOG(DEBUG, TAG, "thread initialize..");
140
141     // set send thread data
142     thread->threadPool = handle;
143     thread->dataQueue = u_queue_create();
144     thread->threadMutex = ca_mutex_new();
145     thread->threadCond = ca_cond_new();
146     thread->isStop = true;
147     thread->threadTask = task;
148     thread->destroy = destroy;
149     if(NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
150         goto ERROR_MEM_FAILURE;
151
152     return CA_STATUS_OK;
153     ERROR_MEM_FAILURE:
154     if(thread->dataQueue)
155     {
156         u_queue_delete(thread->dataQueue);
157         thread->dataQueue = NULL;
158     }
159     if(thread->threadMutex)
160     {
161         ca_mutex_free(thread->threadMutex);
162         thread->threadMutex = NULL;
163     }
164     if(thread->threadCond)
165     {
166         ca_cond_free(thread->threadCond);
167         thread->threadCond = NULL;
168     }
169     return CA_MEMORY_ALLOC_FAILED;
170
171 }
172
173 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
174 {
175     if (NULL == thread)
176     {
177         OIC_LOG(ERROR, TAG, "thread instance is empty..");
178         return CA_STATUS_INVALID_PARAM;
179     }
180
181     if (NULL == thread->threadPool)
182     {
183         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
184         return CA_STATUS_INVALID_PARAM;
185     }
186
187     if (false == thread->isStop) //Queueing thread already running
188     {
189         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
190         return CA_STATUS_OK;
191     }
192
193     // mutex lock
194     ca_mutex_lock(thread->threadMutex);
195     thread->isStop = false;
196     // mutex unlock
197     ca_mutex_unlock(thread->threadMutex);
198
199     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
200                                             thread);
201     if (res != CA_STATUS_OK)
202     {
203         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
204     }
205
206     return res;
207 }
208
209 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
210 {
211     if (NULL == thread)
212     {
213         OIC_LOG(ERROR, TAG, "thread instance is empty..");
214         return CA_STATUS_INVALID_PARAM;
215     }
216
217     if (NULL == data || 0 == size)
218     {
219         OIC_LOG(ERROR, TAG, "data is empty..");
220
221         return CA_STATUS_INVALID_PARAM;
222     }
223
224     // create thread data
225     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
226
227     if (NULL == message)
228     {
229         OIC_LOG(ERROR, TAG, "memory error!!");
230         return CA_MEMORY_ALLOC_FAILED;
231     }
232
233     message->msg = data;
234     message->size = size;
235
236     // mutex lock
237     ca_mutex_lock(thread->threadMutex);
238
239     // add thread data into list
240     u_queue_add_element(thread->dataQueue, message);
241
242     // notity the thread
243     ca_cond_signal(thread->threadCond);
244
245     // mutex unlock
246     ca_mutex_unlock(thread->threadMutex);
247
248     return CA_STATUS_OK;
249 }
250
251 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
252 {
253     if (NULL == thread)
254     {
255         OIC_LOG(ERROR, TAG, "thread instance is empty..");
256         return CA_STATUS_INVALID_PARAM;
257     }
258
259     OIC_LOG(DEBUG, TAG, "thread destroy..");
260
261     ca_mutex_free(thread->threadMutex);
262     thread->threadMutex = NULL;
263     ca_cond_free(thread->threadCond);
264     u_queue_delete(thread->dataQueue);
265
266     return CA_STATUS_OK;
267 }
268
269 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
270 {
271     if (NULL == thread)
272     {
273         OIC_LOG(ERROR, TAG, "thread instance is empty..");
274         return CA_STATUS_INVALID_PARAM;
275     }
276
277     OIC_LOG(DEBUG, TAG, "thread stop request!!");
278
279     if (!thread->isStop)
280     {
281         // mutex lock
282         ca_mutex_lock(thread->threadMutex);
283
284         // set stop flag
285         thread->isStop = true;
286
287         // notify the thread
288         ca_cond_signal(thread->threadCond);
289
290         ca_cond_wait(thread->threadCond, thread->threadMutex);
291
292         // mutex unlock
293         ca_mutex_unlock(thread->threadMutex);
294     }
295
296     return CA_STATUS_OK;
297 }