0894b46bc2443d2b3369707cfd37cae8d8cb617c
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/syscall.h>
26 #include <sys/types.h>
27
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
30 #include "logger.h"
31
32 #define TAG PCF("CA")
33
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 {
36     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
37
38     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
39
40     if (thread == NULL)
41     {
42         OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
43
44         return;
45     }
46
47     while (!thread->isStop)
48     {
49         // mutex lock
50         u_mutex_lock(thread->threadMutex);
51
52         // if queue is empty, thread will wait
53         if (u_queue_get_size(thread->dataQueue) <= 0)
54         {
55             OIC_LOG_V(DEBUG, TAG, "wait..");
56
57             // wait
58             u_cond_wait(thread->threadCond, thread->threadMutex);
59
60             OIC_LOG_V(DEBUG, TAG, "wake up..");
61         }
62
63         // mutex unlock
64         u_mutex_unlock(thread->threadMutex);
65
66         // check stop flag
67         if (thread->isStop)
68             continue;
69
70         // get data
71         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
72
73         void *data = message->msg;
74
75         // process data
76         thread->threadTask(data);
77
78         // free
79         OICFree(message);
80     }
81
82     u_cond_signal(thread->threadCond);
83
84     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
85 }
86
87 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
88                                       CAThreadTask task)
89 {
90     if (thread == NULL)
91     {
92         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
93         return CA_STATUS_FAILED;
94     }
95
96     if (handle == NULL)
97     {
98         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
99         return CA_STATUS_FAILED;
100     }
101
102     OIC_LOG_V(DEBUG, TAG, "thread initialize..");
103
104     memset(thread, 0, sizeof(CAQueueingThread_t));
105
106     // mutex init
107     u_mutex_init();
108
109     // set send thread data
110     thread->threadPool = handle;
111     thread->dataQueue = u_queue_create();
112     thread->threadMutex = u_mutex_new();
113     thread->threadCond = u_cond_new();
114     thread->isStop = CA_TRUE;
115     thread->threadTask = task;
116
117     return CA_STATUS_OK;
118 }
119
120 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
121 {
122     if (thread == NULL)
123     {
124         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
125         return CA_STATUS_FAILED;
126     }
127
128     if (thread->threadPool == NULL)
129     {
130         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
131         return CA_STATUS_FAILED;
132     }
133
134     if (CA_FALSE == thread->isStop) //Queueing thread already running
135     {
136         OIC_LOG_V(DEBUG, TAG, "queueing thread already running..");
137         return CA_STATUS_OK;
138     }
139
140     thread->isStop = CA_FALSE;
141     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
142                                             thread);
143     if (res != CA_STATUS_OK)
144     {
145         OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread).");
146         thread->isStop = CA_TRUE;
147         return res;
148     }
149
150     return res;
151 }
152
153 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
154 {
155     if (thread == NULL)
156     {
157         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
158         return CA_STATUS_FAILED;
159     }
160
161     if (data == NULL || size == 0)
162     {
163         OIC_LOG_V(DEBUG, TAG, "data is empty..");
164
165         return CA_STATUS_FAILED;
166     }
167
168     // create thread data
169     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
170
171     if (message == NULL)
172     {
173         OIC_LOG_V(DEBUG, TAG, "memory error!!");
174         return CA_MEMORY_ALLOC_FAILED;
175     }
176     memset(message, 0, sizeof(u_queue_message_t));
177
178     message->msg = data;
179     message->size = sizeof(size);
180
181     // mutex lock
182     u_mutex_lock(thread->threadMutex);
183
184     // add thread data into list
185     u_queue_add_element(thread->dataQueue, message);
186
187     // notity the thread
188     u_cond_signal(thread->threadCond);
189
190     // mutex unlock
191     u_mutex_unlock(thread->threadMutex);
192
193     return CA_STATUS_OK;
194 }
195
196 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
197 {
198     if (thread == NULL)
199     {
200         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
201         return CA_STATUS_FAILED;
202     }
203
204     OIC_LOG_V(DEBUG, TAG, "thread destroy..");
205
206     u_mutex_free(thread->threadMutex);
207     thread->threadMutex = NULL;
208     u_cond_free(thread->threadCond);
209     u_queue_delete(thread->dataQueue);
210
211     return CA_STATUS_OK;
212 }
213
214 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
215 {
216     if (thread == NULL)
217     {
218         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
219         return CA_STATUS_FAILED;
220     }
221
222     OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
223
224     if (!thread->isStop)
225     {
226         // mutex lock
227         u_mutex_lock(thread->threadMutex);
228
229         // set stop flag
230         thread->isStop = CA_TRUE;
231
232         // notity the thread
233         u_cond_signal(thread->threadCond);
234
235         u_cond_wait(thread->threadCond, thread->threadMutex);
236
237         // mutex unlock
238         u_mutex_unlock(thread->threadMutex);
239     }
240
241     return CA_STATUS_OK;
242 }
243