[CA] [All] Klockwork , Prevent , Memory leaks fixed
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/syscall.h>
26 #include <sys/types.h>
27
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
30 #include "logger.h"
31
32 #define TAG PCF("CA")
33
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 {
36     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
37
38     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
39
40     if (thread == NULL)
41     {
42         OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
43
44         return;
45     }
46
47     while (!thread->isStop)
48     {
49         // mutex lock
50         u_mutex_lock(thread->threadMutex);
51
52         // if queue is empty, thread will wait
53         if (u_queue_get_size(thread->dataQueue) <= 0)
54         {
55             OIC_LOG_V(DEBUG, TAG, "wait..");
56
57             // wait
58             u_cond_wait(thread->threadCond, thread->threadMutex);
59
60             OIC_LOG_V(DEBUG, TAG, "wake up..");
61         }
62
63         // mutex unlock
64         u_mutex_unlock(thread->threadMutex);
65
66         // check stop flag
67         if (thread->isStop)
68             continue;
69
70         // get data
71         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
72         if (message == NULL)
73         {
74             continue;
75         }
76
77         void *data = message->msg;
78
79         // process data
80         thread->threadTask(data);
81
82         // free
83         OICFree(message);
84     }
85
86     u_cond_signal(thread->threadCond);
87
88     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
89 }
90
91 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
92                                       CAThreadTask task)
93 {
94     if (thread == NULL)
95     {
96         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
97         return CA_STATUS_FAILED;
98     }
99
100     if (handle == NULL)
101     {
102         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
103         return CA_STATUS_FAILED;
104     }
105
106     OIC_LOG_V(DEBUG, TAG, "thread initialize..");
107
108     memset(thread, 0, sizeof(CAQueueingThread_t));
109
110     // mutex init
111     u_mutex_init();
112
113     // set send thread data
114     thread->threadPool = handle;
115     thread->dataQueue = u_queue_create();
116     thread->threadMutex = u_mutex_new();
117     thread->threadCond = u_cond_new();
118     thread->isStop = CA_TRUE;
119     thread->threadTask = task;
120
121     return CA_STATUS_OK;
122 }
123
124 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
125 {
126     if (thread == NULL)
127     {
128         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
129         return CA_STATUS_FAILED;
130     }
131
132     if (thread->threadPool == NULL)
133     {
134         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
135         return CA_STATUS_FAILED;
136     }
137
138     if (CA_FALSE == thread->isStop) //Queueing thread already running
139     {
140         OIC_LOG_V(DEBUG, TAG, "queueing thread already running..");
141         return CA_STATUS_OK;
142     }
143
144     thread->isStop = CA_FALSE;
145     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
146                                             thread);
147     if (res != CA_STATUS_OK)
148     {
149         OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread).");
150         thread->isStop = CA_TRUE;
151         return res;
152     }
153
154     return res;
155 }
156
157 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
158 {
159     if (thread == NULL)
160     {
161         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
162         return CA_STATUS_FAILED;
163     }
164
165     if (data == NULL || size == 0)
166     {
167         OIC_LOG_V(DEBUG, TAG, "data is empty..");
168
169         return CA_STATUS_FAILED;
170     }
171
172     // create thread data
173     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
174
175     if (message == NULL)
176     {
177         OIC_LOG_V(DEBUG, TAG, "memory error!!");
178         return CA_MEMORY_ALLOC_FAILED;
179     }
180     memset(message, 0, sizeof(u_queue_message_t));
181
182     message->msg = data;
183     message->size = sizeof(size);
184
185     // mutex lock
186     u_mutex_lock(thread->threadMutex);
187
188     // add thread data into list
189     u_queue_add_element(thread->dataQueue, message);
190
191     // notity the thread
192     u_cond_signal(thread->threadCond);
193
194     // mutex unlock
195     u_mutex_unlock(thread->threadMutex);
196
197     return CA_STATUS_OK;
198 }
199
200 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
201 {
202     if (thread == NULL)
203     {
204         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
205         return CA_STATUS_FAILED;
206     }
207
208     OIC_LOG_V(DEBUG, TAG, "thread destroy..");
209
210     u_mutex_free(thread->threadMutex);
211     thread->threadMutex = NULL;
212     u_cond_free(thread->threadCond);
213     u_queue_delete(thread->dataQueue);
214
215     return CA_STATUS_OK;
216 }
217
218 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
219 {
220     if (thread == NULL)
221     {
222         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
223         return CA_STATUS_FAILED;
224     }
225
226     OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
227
228     if (!thread->isStop)
229     {
230         // mutex lock
231         u_mutex_lock(thread->threadMutex);
232
233         // set stop flag
234         thread->isStop = CA_TRUE;
235
236         // notity the thread
237         u_cond_signal(thread->threadCond);
238
239         u_cond_wait(thread->threadCond, thread->threadMutex);
240
241         // mutex unlock
242         u_mutex_unlock(thread->threadMutex);
243     }
244
245     return CA_STATUS_OK;
246 }
247