iotivity 0.9.0
[platform/upstream/iotivity.git] / service / soft-sensor-manager / SSMCore / src / Common / ThreadManager.cpp
1 /******************************************************************
2 *
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
4 *
5 *
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *      http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 ******************************************************************/
20 #include "ThreadManager.h"
21
22 CSimpleMutex::CSimpleMutex()
23 {
24 #if defined(WIN32)
25     InitializeCriticalSection(&m_criticalSection);
26 #elif defined(LINUX)
27     pthread_mutexattr_init(&m_mutexAttribute);
28     pthread_mutexattr_settype(&m_mutexAttribute, PTHREAD_MUTEX_RECURSIVE);
29     pthread_mutex_init(&m_mutex, &m_mutexAttribute);
30 #else
31 #error WIN32 or LINUX tag must be defined
32 #endif
33 }
34
35 CSimpleMutex::~CSimpleMutex()
36 {
37 #if defined(WIN32)
38     DeleteCriticalSection(&m_criticalSection);
39 #elif defined(LINUX)
40     pthread_mutex_destroy(&m_mutex);
41     pthread_mutexattr_destroy(&m_mutexAttribute);
42 #else
43 #error WIN32 or LINUX tag must be defined
44 #endif
45 }
46
47 void CSimpleMutex::lock()
48 {
49 #if defined(WIN32)
50     EnterCriticalSection(&m_criticalSection);
51 #elif defined(LINUX)
52     pthread_mutex_lock(&m_mutex);
53 #else
54 #error WIN32 or LINUX tag must be defined
55 #endif
56 }
57
58 void CSimpleMutex::unlock()
59 {
60 #if defined(WIN32)
61     LeaveCriticalSection(&m_criticalSection);
62 #elif defined(LINUX)
63     pthread_mutex_unlock(&m_mutex);
64 #else
65 #error WIN32 or LINUX tag must be defined
66 #endif
67 }
68
69
70 CSemaphore::CSemaphore()
71 {
72 }
73
74 CSemaphore::~CSemaphore()
75 {
76 }
77
78 SSMRESULT CSemaphore::initialize()
79 {
80 #if defined(WIN32)
81     hSemaphore = CreateSemaphore(NULL, 1, 1, NULL);
82
83     if (hSemaphore == NULL)
84         return SSM_E_FAIL;
85 #elif defined(LINUX)
86     if (sem_init(&hSemaphore, 0, 0) == -1)
87         return SSM_E_FAIL;
88 #else
89 #error WIN32 or LINUX tag must be defined
90 #endif
91     return SSM_S_OK;
92 }
93
94 SSMRESULT CSemaphore::destroy()
95 {
96 #if defined(WIN32)
97     if (CloseHandle(hSemaphore) == FALSE)
98         return SSM_E_FAIL;
99 #elif defined(LINUX)
100     if (sem_destroy(&hSemaphore) == -1)
101         return SSM_E_FAIL;
102 #else
103 #error WIN32 or LINUX tag must be defined
104 #endif
105     return SSM_S_OK;
106 }
107
108 SSMRESULT CSemaphore::take()
109 {
110 #if defined(WIN32)
111     if (WaitForSingleObject(hSemaphore, INFINITE) == WAIT_OBJECT_0)
112         return SSM_S_OK;
113
114     return SSM_E_FAIL;
115 #elif defined(LINUX)
116     if (sem_wait(&hSemaphore) == -1)
117         return SSM_E_FAIL;
118
119     return SSM_S_OK;
120 #else
121 #error WIN32 or LINUX tag must be defined
122 #endif
123 }
124
125 SSMRESULT CSemaphore::give()
126 {
127 #if defined(WIN32)
128     if (ReleaseSemaphore(hSemaphore, 1, NULL) != 0)
129         return SSM_S_OK;
130
131     return SSM_E_FAIL;
132 #elif defined(LINUX)
133     if (sem_post(&hSemaphore) == -1)
134         return SSM_E_FAIL;
135
136     return SSM_S_OK;
137 #else
138 #error WIN32 or LINUX tag must be defined
139 #endif
140 }
141
142
143 bool CWorkerThread::getTask(ClientEntry *clientEntry)
144 {
145     m_mtxClientEntry.lock();
146     //Check empty
147     if (m_ClientEntry.empty())
148     {
149         m_mtxClientEntry.unlock();
150         //Sleep if there are no more tasks
151         m_semTask.take();
152     }
153     else
154     {
155         m_mtxClientEntry.unlock();
156     }
157
158     //Check destroy
159     m_mtxThreadTerm.lock();
160     if (m_bThreadTerm == true)
161     {
162         m_mtxThreadTerm.unlock();
163         return false;
164     }
165     m_mtxThreadTerm.unlock();
166
167     m_mtxClientEntry.lock();
168     if (!m_ClientEntry.empty())
169     {
170         *clientEntry = m_ClientEntry.front();
171         m_ClientEntry.pop_front();
172     }
173     m_mtxClientEntry.unlock();
174
175     return true;
176 }
177
178 void CWorkerThread::worker()
179 {
180     ClientEntry clientEntry;
181
182     m_semTask.initialize();
183
184     //Thread Creation completed
185     m_semInit.give();
186
187     //Wait for any tasks
188     while (getTask(&clientEntry))
189     {
190         if (clientEntry.pClient)
191         {
192             clientEntry.pClient->onExecute(clientEntry.pArg);
193             clientEntry.pClient->onTerminate(clientEntry.pArg);
194         }
195         SAFE_RELEASE(clientEntry.pClient);
196     }
197
198     //Clean all remaining tasks
199     m_mtxClientEntry.lock();
200     //Remove all tasks from queue
201     for (std::list<ClientEntry>::iterator itor = m_ClientEntry.begin();
202          itor != m_ClientEntry.end(); ++itor)
203     {
204         ClientEntry clientEntry = *itor;
205         clientEntry.pClient->onTerminate(clientEntry.pArg);
206         SAFE_RELEASE(clientEntry.pClient);
207     }
208     m_ClientEntry.clear();
209     m_mtxClientEntry.unlock();
210
211     m_semTask.destroy();
212
213     m_semTerm.give();
214 }
215
216 SSMRESULT CWorkerThread::initialize()
217 {
218     SSMRESULT res = SSM_E_FAIL;
219
220     //Create thread and wait for jobs
221 #if defined(WIN32)
222     m_hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)_worker,
223                              this, 0, NULL);
224
225     if (m_hThread == NULL)
226     {
227         return SSM_E_FAIL;
228     }
229 #elif defined(LINUX)
230     if (pthread_create(&m_hThread, NULL, (void *(*)(void *))_worker, (void *)this) != 0)
231     {
232         return SSM_E_FAIL;
233     }
234 #else
235 #error WIN32 or LINUX tag must be defined
236 #endif
237     //Wait till thread created
238     SSM_CLEANUP_ASSERT(m_semInit.take());
239     SSM_CLEANUP_ASSERT(m_semInit.destroy());
240
241 CLEANUP:
242     return res;
243 }
244
245 SSMRESULT CWorkerThread::terminate()
246 {
247     SSMRESULT res = SSM_E_FAIL;
248
249     //Let worker destroyed
250     m_mtxThreadTerm.lock();
251     m_bThreadTerm = true;
252     m_mtxThreadTerm.unlock();
253
254     SSM_CLEANUP_ASSERT(m_semTask.give());
255
256     SSM_CLEANUP_ASSERT(m_semTerm.take());
257     SSM_CLEANUP_ASSERT(m_semTerm.destroy());
258
259 #if defined(WIN32)
260     if (m_hThread != NULL)
261     {
262         CloseHandle(m_hThread);
263     }
264 #elif defined(LINUX)
265     pthread_detach(m_hThread);
266 #else
267 #error WIN32 or LINUX tag must be defined
268 #endif
269     res = SSM_S_OK;
270
271 CLEANUP:
272     return res;
273 }
274
275 SSMRESULT CWorkerThread::finalConstruct()
276 {
277     SSMRESULT   res = SSM_E_FAIL;
278
279     m_bThreadTerm = false;
280
281     SSM_CLEANUP_ASSERT(m_semInit.initialize());
282     SSM_CLEANUP_ASSERT(m_semTerm.initialize());
283
284 CLEANUP:
285     return res;
286 }
287
288 void CWorkerThread::finalRelease()
289 {
290 }
291
292 SSMRESULT CWorkerThread::addTask(IThreadClient *pThreadClient, void *param)
293 {
294     ClientEntry clientEntry;
295
296     pThreadClient->addRef();
297     clientEntry.pClient = pThreadClient;
298     clientEntry.pArg = param;
299
300     m_mtxClientEntry.lock();
301     if (m_ClientEntry.empty())
302     {
303         m_semTask.give();
304     }
305     m_ClientEntry.push_back(clientEntry);
306     //Let the task worker know, we just added task
307     m_mtxClientEntry.unlock();
308
309     return SSM_S_OK;
310 }
311
312
313 SSMRESULT CTasker::finalConstruct()
314 {
315     SSMRESULT   res = SSM_E_FAIL;
316
317     SSM_CLEANUP_ASSERT(CreateGlobalInstance(OID_IThreadPool, (IBase **)&m_pThreadPool));
318
319     SSM_CLEANUP_ASSERT(m_pThreadPool->createWorkerThread(&m_pWorkerThread));
320
321 CLEANUP:
322     return res;
323 }
324
325 void CTasker::finalRelease()
326 {
327 }
328
329 SSMRESULT CTasker::addTask(IThreadClient *pThreadClient, void *param)
330 {
331     return m_pWorkerThread->addTask(pThreadClient, param);
332 }
333
334
335 SSMRESULT CThreadPool::finalConstruct()
336 {
337     return SSM_S_OK;
338 }
339
340 void CThreadPool::finalRelease()
341 {
342     for (std::vector<IWorkerThread *>::iterator itor = m_lstWorkerThread.begin();
343          itor != m_lstWorkerThread.end(); ++itor)
344     {
345         (*itor)->release();
346     }
347 }
348
349 SSMRESULT CThreadPool::createWorkerThread(OUT IWorkerThread **ppWorkerThread)
350 {
351     SSMRESULT res = SSM_E_FAIL;
352
353     IWorkerThread       *pWorkerThread = NULL;
354
355     SSM_CLEANUP_NULL_ASSERT(ppWorkerThread);
356
357     SSM_CLEANUP_ASSERT(CreateInstance(OID_IWorkerThread, (IBase **)&pWorkerThread));
358     SSM_CLEANUP_ASSERT(pWorkerThread->initialize());
359     SSM_CLEANUP_ASSERT(pWorkerThread->queryInterface(OID_IWorkerThread, (IBase **)ppWorkerThread));
360     m_lstWorkerThread.push_back(pWorkerThread);
361
362 CLEANUP:
363     return res;
364 }
365
366 SSMRESULT CThreadPool::destroyThreadPool()
367 {
368     SSMRESULT   res = SSM_E_FAIL;
369
370     for (std::vector<IWorkerThread *>::iterator itor = m_lstWorkerThread.begin();
371          itor != m_lstWorkerThread.end(); ++itor)
372     {
373         SSM_CLEANUP_ASSERT((*itor)->terminate());
374     }
375
376     res = SSM_S_OK;
377
378 CLEANUP:
379     return res;
380 }