Merge "Implementation of connectivity abstraction feature release v0.2" into connecti...
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include "caqueueingthread.h"
26 #include "oic_malloc.h"
27 #include "logger.h"
28
29 #define TAG PCF("CA")
30
31 static void CAQueueingThreadBaseRoutine(void* treadValue)
32 {
33     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
34
35     CAQueueingThread_t* thread = (CAQueueingThread_t*) treadValue;
36
37     if (thread == NULL)
38     {
39         OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
40
41         return;
42     }
43
44     while (!thread->isStop)
45     {
46         // mutex lock
47         u_mutex_lock(thread->threadMutex);
48
49         // if queue is empty, thread will wait
50         if (u_queue_get_size(thread->dataQueue) <= 0)
51         {
52             OIC_LOG_V(DEBUG, TAG, "wait..");
53             // wait
54             u_cond_wait(thread->threadCond, thread->threadMutex);
55
56             OIC_LOG_V(DEBUG, TAG, "wake up..");
57         }
58
59         // mutex unlock
60         u_mutex_unlock(thread->threadMutex);
61
62         // check stop flag
63         if (thread->isStop)
64             continue;
65
66         // get data
67         u_queue_message_t* message = u_queue_get_element(thread->dataQueue);
68
69         void* data = message->msg;
70
71         // process data
72         thread->threadTask(data);
73
74         // free
75     }
76
77     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
78 }
79
80 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t* thread, u_thread_pool_t handle,
81         CAThreadTask task)
82 {
83     if (thread == NULL)
84     {
85         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
86         return CA_STATUS_FAILED;
87     }
88
89     if (handle == NULL)
90     {
91         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
92         return CA_STATUS_FAILED;
93     }
94
95     OIC_LOG_V(DEBUG, TAG, "thread initialize..");
96
97     memset(thread, 0, sizeof(CAQueueingThread_t));
98
99     // mutex init
100     u_mutex_init();
101
102     // set send thread data
103     thread->threadPool = handle;
104     thread->dataQueue = u_queue_create();
105     thread->threadMutex = u_mutex_new();
106     thread->threadCond = u_cond_new();
107     thread->isStop = CA_FALSE;
108     thread->threadTask = task;
109
110     return CA_STATUS_OK;
111 }
112
113 CAResult_t CAQueueingThreadStart(CAQueueingThread_t* thread)
114 {
115     if (thread == NULL)
116     {
117         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
118         return CA_STATUS_FAILED;
119     }
120
121     if (thread->threadPool == NULL)
122     {
123         OIC_LOG_V(DEBUG, TAG, "thread pool handle is empty..");
124         return CA_STATUS_FAILED;
125     }
126
127     CAResult_t res = u_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
128             thread);
129
130     if (res != CA_STATUS_OK)
131     {
132         OIC_LOG_V(DEBUG, TAG, "thread pool add task error(send thread).");
133         return res;
134     }
135
136     return res;
137 }
138
139 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t* thread, void* data, uint32_t size)
140 {
141     if (thread == NULL)
142     {
143         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
144         return CA_STATUS_FAILED;
145     }
146
147     if (data == NULL || size == 0)
148     {
149         OIC_LOG_V(DEBUG, TAG, "data is empty..");
150
151         return CA_STATUS_FAILED;
152     }
153
154     // create thread data
155     u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
156
157     if (message == NULL)
158     {
159         OIC_LOG_V(DEBUG, TAG, "memory error!!");
160         return CA_MEMORY_ALLOC_FAILED;
161     }
162     memset(message, 0, sizeof(u_queue_message_t));
163
164     message->msg = data;
165     message->size = sizeof(size);
166
167     // mutex lock
168     u_mutex_lock(thread->threadMutex);
169
170     // add thread data into list
171     u_queue_add_element(thread->dataQueue, message);
172
173     // notity the thread
174     u_cond_signal(thread->threadCond);
175
176     // mutex unlock
177     u_mutex_unlock(thread->threadMutex);
178
179     return CA_STATUS_OK;
180 }
181
182 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t* thread)
183 {
184     if (thread == NULL)
185     {
186         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
187         return CA_STATUS_FAILED;
188     }
189
190     OIC_LOG_V(DEBUG, TAG, "thread destroy..");
191
192     u_mutex_free(thread->threadMutex);
193     u_cond_free(thread->threadCond);
194     u_queue_delete(thread->dataQueue);
195
196     return CA_STATUS_OK;
197 }
198
199 CAResult_t CAQueueingThreadStop(CAQueueingThread_t* thread)
200 {
201     if (thread == NULL)
202     {
203         OIC_LOG_V(DEBUG, TAG, "thread instance is empty..");
204         return CA_STATUS_FAILED;
205     }
206
207     OIC_LOG_V(DEBUG, TAG, "thread stop request!!");
208
209     // mutex lock
210     u_mutex_lock(thread->threadMutex);
211
212     // set stop flag
213     thread->isStop = CA_TRUE;
214
215     // notity the thread
216     u_cond_signal(thread->threadCond);
217
218     // mutex unlock
219     u_mutex_unlock(thread->threadMutex);
220
221     return CA_STATUS_OK;
222 }
223