Merge branch 'connectivity-abstraction' to master
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/syscall.h>
26 #include <sys/types.h>
27
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
30 #include "logger.h"
31
32 #define TAG PCF("CA")
33
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 {
36     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
37
38     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
39
40     if (thread == NULL)
41     {
42         OIC_LOG(ERROR, TAG, "thread data passing error!!");
43
44         return;
45     }
46
47     while (!thread->isStop)
48     {
49         // mutex lock
50         u_mutex_lock(thread->threadMutex);
51
52         // if queue is empty, thread will wait
53         if (u_queue_get_size(thread->dataQueue) <= 0)
54         {
55             OIC_LOG(DEBUG, TAG, "wait..");
56
57             // wait
58             u_cond_wait(thread->threadCond, thread->threadMutex);
59
60             OIC_LOG(DEBUG, TAG, "wake up..");
61         }
62
63         // mutex unlock
64         u_mutex_unlock(thread->threadMutex);
65
66         // check stop flag
67         if (thread->isStop)
68         {
69             continue;
70         }
71
72         // get data
73         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
74         if (message == NULL)
75         {
76             continue;
77         }
78
79         // process data
80         thread->threadTask(message->msg);
81
82         // free
83         if (thread->destroy != NULL)
84         {
85             thread->destroy(message->msg, message->size);
86         }
87         else
88         {
89             OICFree(message->msg);
90         }
91
92         OICFree(message);
93     }
94
95     // remove all remained list data.
96     while (u_queue_get_size(thread->dataQueue) > 0)
97     {
98         // get data
99         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
100
101         // free
102         if(message != NULL) {
103             if (thread->destroy != NULL)
104             {
105                 thread->destroy(message->msg, message->size);
106             }
107             else
108             {
109                 OICFree(message->msg);
110             }
111
112             OICFree(message);
113         }
114     }
115
116     u_cond_signal(thread->threadCond);
117
118     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
119 }
120
121 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
122                                       CAThreadTask task, CADataDestroyFunction destroy)
123 {
124     if (thread == NULL)
125     {
126         OIC_LOG(ERROR, TAG, "thread instance is empty..");
127         return CA_STATUS_FAILED;
128     }
129
130     if (handle == NULL)
131     {
132         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
133         return CA_STATUS_FAILED;
134     }
135
136     OIC_LOG(DEBUG, TAG, "thread initialize..");
137
138     // set send thread data
139     thread->threadPool = handle;
140     thread->dataQueue = u_queue_create();
141     thread->threadMutex = u_mutex_new();
142     thread->threadCond = u_cond_new();
143     thread->isStop = true;
144     thread->threadTask = task;
145     thread->destroy = destroy;
146
147     return CA_STATUS_OK;
148 }
149
150 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
151 {
152     if (thread == NULL)
153     {
154         OIC_LOG(ERROR, TAG, "thread instance is empty..");
155         return CA_STATUS_FAILED;
156     }
157
158     if (thread->threadPool == NULL)
159     {
160         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
161         return CA_STATUS_FAILED;
162     }
163
164     if (false == thread->isStop) //Queueing thread already running
165     {
166         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
167         return CA_STATUS_OK;
168     }
169
170     // mutex lock
171     u_mutex_lock(thread->threadMutex);
172     thread->isStop = false;
173     // mutex unlock
174     u_mutex_unlock(thread->threadMutex);
175
176     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
177                                             thread);
178     if (res != CA_STATUS_OK)
179     {
180         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
181     }
182
183     return res;
184 }
185
186 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
187 {
188     if (thread == NULL)
189     {
190         OIC_LOG(ERROR, TAG, "thread instance is empty..");
191         return CA_STATUS_FAILED;
192     }
193
194     if (data == NULL || size == 0)
195     {
196         OIC_LOG(ERROR, TAG, "data is empty..");
197
198         return CA_STATUS_FAILED;
199     }
200
201     // create thread data
202     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
203
204     if (message == NULL)
205     {
206         OIC_LOG(ERROR, TAG, "memory error!!");
207         return CA_MEMORY_ALLOC_FAILED;
208     }
209
210     message->msg = data;
211     message->size = size;
212
213     // mutex lock
214     u_mutex_lock(thread->threadMutex);
215
216     // add thread data into list
217     u_queue_add_element(thread->dataQueue, message);
218
219     // notity the thread
220     u_cond_signal(thread->threadCond);
221
222     // mutex unlock
223     u_mutex_unlock(thread->threadMutex);
224
225     return CA_STATUS_OK;
226 }
227
228 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
229 {
230     if (thread == NULL)
231     {
232         OIC_LOG(ERROR, TAG, "thread instance is empty..");
233         return CA_STATUS_FAILED;
234     }
235
236     OIC_LOG(DEBUG, TAG, "thread destroy..");
237
238     u_mutex_free(thread->threadMutex);
239     thread->threadMutex = NULL;
240     u_cond_free(thread->threadCond);
241     u_queue_delete(thread->dataQueue);
242
243     return CA_STATUS_OK;
244 }
245
246 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
247 {
248     if (thread == NULL)
249     {
250         OIC_LOG(ERROR, TAG, "thread instance is empty..");
251         return CA_STATUS_FAILED;
252     }
253
254     OIC_LOG(DEBUG, TAG, "thread stop request!!");
255
256     if (!thread->isStop)
257     {
258         // mutex lock
259         u_mutex_lock(thread->threadMutex);
260
261         // set stop flag
262         thread->isStop = true;
263
264         // notify the thread
265         u_cond_signal(thread->threadCond);
266
267         u_cond_wait(thread->threadCond, thread->threadMutex);
268
269         // mutex unlock
270         u_mutex_unlock(thread->threadMutex);
271     }
272
273     return CA_STATUS_OK;
274 }