Merge branch 'master' into connectivity-abstraction
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <sys/syscall.h>
26 #include <sys/types.h>
27
28 #include "caqueueingthread.h"
29 #include "oic_malloc.h"
30 #include "logger.h"
31
32 #define TAG PCF("CA")
33
34 static void CAQueueingThreadBaseRoutine(void *threadValue)
35 {
36     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
37
38     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
39
40     if (thread == NULL)
41     {
42         OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
43
44         return;
45     }
46
47     while (!thread->isStop)
48     {
49         // mutex lock
50         u_mutex_lock(thread->threadMutex);
51
52         // if queue is empty, thread will wait
53         if (u_queue_get_size(thread->dataQueue) <= 0)
54         {
55             OIC_LOG_V(DEBUG, TAG, "wait..");
56             // wait
57             u_cond_wait(thread->threadCond, thread->threadMutex);
58
59             OIC_LOG_V(DEBUG, TAG, "wake up..");
60         }
61
62         // mutex unlock
63         u_mutex_unlock(thread->threadMutex);
64
65         // check stop flag
66         if (thread->isStop)
67             continue;
68
69         // get data
70         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
71
72         void *data = message->msg;
73
74         // process data
75         thread->threadTask(data);
76
77         // free
78     }
79
80     u_cond_signal(thread->threadCond);
81
82     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
83 }
84
85 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, u_thread_pool_t handle,
86                                       CAThreadTask task)
87 {
88     if (thread == NULL)
89     {
90         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
91         return CA_STATUS_FAILED;
92     }
93
94     if (handle == NULL)
95     {
96         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
97         return CA_STATUS_FAILED;
98     }
99
100     OIC_LOG_V(DEBUG, TAG, "thread initialize..");
101
102     memset(thread, 0, sizeof(CAQueueingThread_t));
103
104     // mutex init
105     u_mutex_init();
106
107     // set send thread data
108     thread->threadPool = handle;
109     thread->dataQueue = u_queue_create();
110     thread->threadMutex = u_mutex_new();
111     thread->threadCond = u_cond_new();
112     thread->isStop = CA_FALSE;
113     thread->threadTask = task;
114
115     return CA_STATUS_OK;
116 }
117
118 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
119 {
120     if (thread == NULL)
121     {
122         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
123         return CA_STATUS_FAILED;
124     }
125
126     if (thread->threadPool == NULL)
127     {
128         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
129         return CA_STATUS_FAILED;
130     }
131
132     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
133                                             thread);
134
135     if (res != CA_STATUS_OK)
136     {
137         OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread).");
138         return res;
139     }
140
141     return res;
142 }
143
144 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
145 {
146     if (thread == NULL)
147     {
148         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
149         return CA_STATUS_FAILED;
150     }
151
152     if (data == NULL || size == 0)
153     {
154         OIC_LOG_V(DEBUG, TAG, "data is empty..");
155
156         return CA_STATUS_FAILED;
157     }
158
159     // create thread data
160     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
161
162     if (message == NULL)
163     {
164         OIC_LOG_V(DEBUG, TAG, "memory error!!");
165         return CA_MEMORY_ALLOC_FAILED;
166     }
167     memset(message, 0, sizeof(u_queue_message_t));
168
169     message->msg = data;
170     message->size = sizeof(size);
171
172     // mutex lock
173     u_mutex_lock(thread->threadMutex);
174
175     // add thread data into list
176     u_queue_add_element(thread->dataQueue, message);
177
178     // notity the thread
179     u_cond_signal(thread->threadCond);
180
181     // mutex unlock
182     u_mutex_unlock(thread->threadMutex);
183
184     return CA_STATUS_OK;
185 }
186
187 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
188 {
189     if (thread == NULL)
190     {
191         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
192         return CA_STATUS_FAILED;
193     }
194
195     OIC_LOG_V(DEBUG, TAG, "thread destroy..");
196
197     u_mutex_free(thread->threadMutex);
198     thread->threadMutex = NULL;
199     u_cond_free(thread->threadCond);
200     u_queue_delete(thread->dataQueue);
201
202     return CA_STATUS_OK;
203 }
204
205 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
206 {
207     if (thread == NULL)
208     {
209         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
210         return CA_STATUS_FAILED;
211     }
212
213     OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
214
215     // mutex lock
216     u_mutex_lock(thread->threadMutex);
217
218     // set stop flag
219     thread->isStop = CA_TRUE;
220
221     // notity the thread
222     u_cond_signal(thread->threadCond);
223
224     u_cond_wait(thread->threadCond, thread->threadMutex);    
225
226     // mutex unlock
227     u_mutex_unlock(thread->threadMutex);
228
229     return CA_STATUS_OK;
230 }
231