* wrapping DLT calls in a new Class because of performance, codesize and lazyness...
[profile/ivi/audiomanager.git] / PluginRoutingInterfaceAsync / src / RoutingSenderAsync.cpp
1 /**
2  * Copyright (C) 2011, BMW AG
3  *
4  * GeniviAudioMananger DbusPlugin
5  *
6  * \file RoutingSender.cpp
7  *
8  * \date 20-Oct-2011 3:42:04 PM
9  * \author Christian Mueller (christian.ei.mueller@bmw.de)
10  *
11  * \section License
12  * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13  * Copyright (C) 2011, BMW AG Christian Mueller  Christian.ei.mueller@bmw.de
14  *
15  * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17  * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18  * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19  * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20  * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21  * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
22  *
23  */
24
25 #include "RoutingSenderAsyn.h"
26 #include "DLTWrapper.h"
27 #include <algorithm>
28 #include <vector>
29 #include <poll.h>
30 #include <errno.h>
31 #include <time.h>
32 #include <assert.h>
33 #include <sstream>
34 #include <string>
35 #include <dbus/dbus.h>
36
37 using namespace am;
38
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
40
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory()
42 {
43     return (new AsyncRoutingSender());
44 }
45
46 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface)
47 {
48     delete routingSendInterface;
49 }
50
51 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
55 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
56 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
57
58 void* AsyncRoutingSender::InterruptEvents(void *data)
59 {
60     RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
61     DBusError err;
62     DBusMessage* msg;
63     DBusConnection* conn;
64     dbus_error_init(&err);
65     conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
66     dbus_uint32_t serial = 0;
67     DBusMessage* reply;
68     DBusMessageIter args;
69     dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
70
71     while (dbus_connection_read_write_dispatch(conn, -1))
72     {
73         dbus_connection_read_write(conn, 0);
74         msg = dbus_connection_pop_message(conn);
75
76         if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
77         {
78             am_connectionID_t connectionID;
79             am_timeSync_t delay;
80             dbus_message_iter_init(msg, &args);
81             dbus_message_iter_get_basic(&args,(void*) &connectionID);
82             dbus_message_iter_next(&args);
83             dbus_message_iter_get_basic(&args,(void*) &delay);
84             reply = dbus_message_new_method_return(msg);
85             dbus_message_iter_init_append(reply, &args);
86             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
87             dbus_connection_send(conn, reply, &serial);
88             shadow->hookTimingInformationChanged(connectionID,delay);
89             dbus_message_unref(reply);
90         }
91         else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
92         {
93             am_sinkID_t sinkID;
94             am_Availability_s availability;
95             dbus_message_iter_init(msg, &args);
96             dbus_message_iter_get_basic(&args,(void*) &sinkID);
97             reply = dbus_message_new_method_return(msg);
98             dbus_message_iter_init_append(reply, &args);
99             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
100             dbus_connection_send(conn, reply, &serial);
101             shadow->hookSinkAvailablityStatusChange(sinkID,availability);
102             dbus_message_unref(reply);
103         }
104         else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
105         {
106             am_sourceID_t sourceID;
107             am_Availability_s availability;
108             dbus_message_iter_init(msg, &args);
109             dbus_message_iter_get_basic(&args,(void*) &sourceID);
110             reply = dbus_message_new_method_return(msg);
111             dbus_message_iter_init_append(reply, &args);
112             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
113             dbus_connection_send(conn, reply, &serial);
114             shadow->hookSourceAvailablityStatusChange(sourceID,availability);
115             dbus_message_unref(reply);
116         }
117         else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
118         {
119             am_sourceID_t sourceID;
120
121             am_InterruptState_e state=IS_MIN;
122             dbus_message_iter_init(msg, &args);
123             dbus_message_iter_get_basic(&args,(void*) &sourceID);
124             reply = dbus_message_new_method_return(msg);
125             dbus_message_iter_init_append(reply, &args);
126             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
127             dbus_connection_send(conn, reply, &serial);
128             shadow->hookInterruptStatusChange(sourceID,state);
129             dbus_message_unref(reply);
130         }
131         dbus_connection_flush(conn);
132     }
133     return NULL;
134 }
135
136 void *WorkerThreadPool::WorkerThread(void* data)
137 {
138     threadInfo_s *myInfo=(threadInfo_s*)data;
139     while (1)
140     {
141         sem_wait(&myInfo->block);
142         pthread_mutex_lock(&mBlockingMutex);
143         Worker* actWorker=myInfo->worker;
144         pthread_mutex_unlock(&mBlockingMutex);
145         actWorker->setCancelSempaphore(&myInfo->cancel);
146         actWorker->start2work();
147         actWorker->pPool->finishedWork(myInfo->threadID);
148     }
149     return NULL;
150 }
151
152 WorkerThreadPool::WorkerThreadPool(int numThreads):
153 mNumThreads(numThreads)
154 {
155     int workerID=0;
156     mListWorkers.resize(mNumThreads);
157     for (int i=0;i<mNumThreads;i++)
158     {
159         sem_init(&mListWorkers[i].block,NULL,NULL);
160         sem_init(&mListWorkers[i].cancel,NULL,NULL);
161         mListWorkers[i].busy=false;
162         mListWorkers[i].workerID=++workerID;
163         pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
164     }
165 }
166
167 int16_t WorkerThreadPool::startWork(Worker *worker)
168 {
169     pthread_mutex_lock(&mBlockingMutex);
170     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
171     for(;it!=mListWorkers.end();++it)
172     {
173         if(!it->busy)
174         {
175             it->worker=worker;
176             it->busy=true;
177             pthread_mutex_unlock(&mBlockingMutex);
178             sem_post(&it->block);
179             return ((int)it->workerID);
180         }
181     }
182     pthread_mutex_unlock(&mBlockingMutex);
183     return (-1);
184 }
185
186 bool WorkerThreadPool::cancelWork(int workerID)
187 {
188     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
189     for(;it!=mListWorkers.end();++it)
190     {
191         if(it->workerID==workerID && it->busy)
192         {
193             sem_post(&it->cancel);
194             return (true);
195         }
196     }
197     return (false);
198 }
199
200 void WorkerThreadPool::finishedWork(pthread_t threadID)
201 {
202     pthread_mutex_lock(&mBlockingMutex);
203     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
204     for(;it!=mListWorkers.end();++it)
205     {
206         if(it->threadID==threadID)
207         {
208             it->busy=false;
209             delete it->worker;
210             break;
211         }
212     }
213     pthread_mutex_unlock(&mBlockingMutex);
214 }
215
216 WorkerThreadPool::~WorkerThreadPool()
217 {
218     for (int i=0;i<mNumThreads;i++)
219     {
220         pthread_cancel(mListWorkers[i].threadID);
221     }
222 }
223
224 Worker::Worker(WorkerThreadPool *pool):
225 pPool(pool), //
226         mCancelSem()
227         {
228         }
229
230         void Worker::setCancelSempaphore(sem_t* cancel)
231         {
232             mCancelSem=cancel;
233         }
234
235         bool Worker::timedWait(timespec timer)
236         {
237             timespec temp;
238             if(clock_gettime(0, &temp)==-1)
239             {
240                 logError("Worker::timedWait error on getting time");
241             }
242             temp.tv_nsec+=timer.tv_nsec;
243             temp.tv_sec+=timer.tv_sec;
244             //if(sem_wait(mCancelSem)==-1)
245         if (sem_timedwait(mCancelSem,&temp)==-1)
246         {
247             //a timeout happened
248             if(errno == ETIMEDOUT)
249             {
250                 logError("Worker::timedWait timeout waiting error");
251                 return (false);
252             }
253             else //failure in waiting, nevertheless, we quit the thread...
254             {
255                 logError("Worker::timedWait semaphore waiting error");
256                 return (true);
257             }
258         }
259         logError("Worker::timedWait semaphore waiting error");
260         this->cancelWork();
261         return (true);
262     }
263
264     AsyncRoutingSender::AsyncRoutingSender():
265     mShadow(), //
266     mReceiveInterface(0), //
267         mDomains(createDomainTable()), //
268         mSinks(createSinkTable()), //
269 mSources ( createSourceTable ( ) ), //
270 mGateways ( createGatewayTable ( ) ) , //
271 mMapHandleWorker ( ), //
272 mMapConnectionIDRoute(),//
273 mPool(10)
274 {
275 }
276
277 AsyncRoutingSender::~AsyncRoutingSender()
278 {
279 }
280
281 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
282 {
283     //first, create the Shadow:
284     assert(routingreceiveinterface!=0);
285     mReceiveInterface = routingreceiveinterface;
286     mShadow.setRoutingInterface(routingreceiveinterface);
287 }
288
289 void AsyncRoutingSender::routingInterfacesReady()
290 {
291     assert(mReceiveInterface!=0);
292     am_Error_e eCode;
293     //first register the domains
294     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
295     for (; domainIter != mDomains.end(); ++domainIter)
296     {
297         am_domainID_t domainID;
298         if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
299         {
300             logError("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with", eCode);
301         }
302         domainIter->domainID = domainID;
303     }
304
305     //then sources
306     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
307     for (; sourceIter != mSources.end(); ++sourceIter)
308     {
309         am_sourceID_t sourceID;
310         //set the correct domainID
311         sourceIter->domainID = mDomains[0].domainID;
312         if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
313         {
314             logError("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with", eCode);
315         }
316     }
317
318     //sinks
319     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
320     for (; sinkIter != mSinks.end(); ++sinkIter)
321     {
322         am_sinkID_t sinkID;
323         //set the correct domainID
324         sinkIter->domainID = mDomains[0].domainID;
325         if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
326         {
327             logError("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with", eCode);
328         }
329     }
330
331     //gateways
332 //      std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
333 //      for(;gatewayIter!=mGateways.end();++gatewayIter)
334 //      {
335 //              am_gatewayID_t gatewayID;
336 //              gatewayIter->domainSinkID=mDomains[0].domainID;
337 //              gatewayIter->domainSourceID=mDomains[1].domainID;
338 //              gatewayIter->controlDomainID=mDomains[0].domainID;
339 //              if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
340 //              {
341 //                      logError("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with",eCode));
342 //              }
343 //              gatewayIter->gatewayID=gatewayID;
344 //      }
345
346     //create thread for interrupts, but only if we are testing - otherwise we get 100% cpu load:
347     //todo: find a solution for the 100% dbus load to uncomment this and make interrupt tests work
348     //pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
349 }
350
351 void AsyncRoutingSender::routingInterfacesRundown()
352 {
353     assert(mReceiveInterface!=0);
354 }
355
356 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
357 {
358     assert(mReceiveInterface!=0);
359     assert(handle.handle!=0);
360
361     //first check if we know the handle
362     pthread_mutex_lock(&mMapHandleWorkerMutex);
363     std::map<uint16_t, int16_t>::iterator iter = mMapHandleWorker.begin();
364     if (mMapHandleWorker.find(handle.handle) == mMapHandleWorker.end())
365     {
366         pthread_mutex_unlock(&mMapHandleWorkerMutex);
367         return (E_NON_EXISTENT);
368     }
369     pthread_mutex_unlock(&mMapHandleWorkerMutex);
370
371     //ok, cancel the action:
372     if (mPool.cancelWork(iter->second))
373         return (E_OK);
374     return (E_UNKNOWN);
375 }
376
377 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
378 {
379     assert(mReceiveInterface!=0);
380     assert(handle.handle!=0);
381     assert(handle.handleType==H_CONNECT);
382     assert(connectionID!=0);
383     assert(sinkID!=0);
384     assert(sourceID!=0);
385
386     //check if we can take the job
387     am_Sink_s sink;
388     am_Source_s source;
389     int16_t work = -1;
390
391     //find the sink
392     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
393     for (; sinkIter != mSinks.end(); ++sinkIter)
394     {
395         if (sinkIter->sinkID == sinkID)
396         {
397             sink = *sinkIter;
398             break;
399         }
400     }
401     if (sinkIter == mSinks.end())
402         return (E_NON_EXISTENT); //not found!
403
404     //find the source
405     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
406     for (; sourceIter != mSources.end(); ++sourceIter)
407     {
408         if (sourceIter->sourceID == sourceID)
409         {
410             source = *sourceIter;
411             break;
412         }
413     }
414     if (sourceIter == mSources.end())
415         return (E_NON_EXISTENT); //not found!
416
417     //check the format
418     if (std::find(source.listConnectionFormats.begin(), source.listConnectionFormats.end(), connectionFormat) == source.listConnectionFormats.end())
419         return (E_WRONG_FORMAT);
420     if (std::find(sink.listConnectionFormats.begin(), sink.listConnectionFormats.end(), connectionFormat) == sink.listConnectionFormats.end())
421         return (E_WRONG_FORMAT);
422
423     //the operation is ok, lets create a worker, assign it to a task in the task pool
424     asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
425     if ((work = mPool.startWork(worker)) == -1)
426     {
427         logError("AsyncRoutingSender::asyncConnect not enough threads!");
428         delete worker;
429         return (E_NOT_POSSIBLE);
430     }
431
432     //save the handle related to the workerID
433     pthread_mutex_lock(&mMapHandleWorkerMutex);
434     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
435     pthread_mutex_unlock(&mMapHandleWorkerMutex);
436
437     return (E_OK);
438 }
439
440 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
441 {
442     assert(mReceiveInterface!=0);
443     assert(handle.handle!=0);
444     assert(handle.handleType==H_DISCONNECT);
445     assert(connectionID!=0);
446
447     //check if we can take the job
448     int16_t work = -1;
449
450     pthread_mutex_lock(&mMapConnectionMutex);
451     if (mMapConnectionIDRoute.find(connectionID) == mMapConnectionIDRoute.end())
452     {
453         pthread_mutex_unlock(&mMapConnectionMutex);
454         return (E_NON_EXISTENT);
455     }
456     pthread_mutex_unlock(&mMapConnectionMutex);
457
458     //the operation is ok, lets create a worker, assign it to a task in the task pool
459     asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
460     if ((work = mPool.startWork(worker)) == -1)
461     {
462         logError("AsyncRoutingSender::asyncDisconnect not enough threads!");
463         delete worker;
464         return (E_NOT_POSSIBLE);
465     }
466
467     //save the handle related to the workerID
468     pthread_mutex_lock(&mMapHandleWorkerMutex);
469     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
470     pthread_mutex_unlock(&mMapHandleWorkerMutex);
471
472     return (E_OK);
473 }
474
475 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
476 {
477     assert(mReceiveInterface!=0);
478     assert(handle.handle!=0);
479     assert(handle.handleType==H_SETSINKVOLUME);
480     assert(sinkID!=0);
481
482     //check if we can take the job
483     am_Sink_s sink;
484     int16_t work = -1;
485
486     //find the sink
487     pthread_mutex_lock(&mSinksMutex);
488     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
489     for (; sinkIter != mSinks.end(); ++sinkIter)
490     {
491         if (sinkIter->sinkID == sinkID)
492         {
493             sink = *sinkIter;
494             break;
495         }
496     }
497     pthread_mutex_unlock(&mSinksMutex);
498     if (sinkIter == mSinks.end())
499         return (E_NON_EXISTENT); //not found!
500
501     asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
502     if ((work = mPool.startWork(worker)) == -1)
503     {
504         logError("AsyncRoutingSender::asyncSetSinkVolume not enough threads!");
505         delete worker;
506         return (E_NOT_POSSIBLE);
507     }
508
509     //save the handle related to the workerID
510     pthread_mutex_lock(&mMapHandleWorkerMutex);
511     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
512     pthread_mutex_unlock(&mMapHandleWorkerMutex);
513
514     return (E_OK);
515 }
516
517 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
518 {
519     assert(mReceiveInterface!=0);
520     assert(handle.handle!=0);
521     assert(handle.handleType==H_SETSOURCEVOLUME);
522     assert(sourceID!=0);
523
524     //check if we can take the job
525     am_Source_s source;
526     int16_t work = -1;
527
528     //find the sink
529     pthread_mutex_lock(&mSourcesMutex);
530     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
531     for (; sourceIter != mSources.end(); ++sourceIter)
532     {
533         if (sourceIter->sourceID == sourceID)
534         {
535             source = *sourceIter;
536             break;
537         }
538     }
539     pthread_mutex_unlock(&mSourcesMutex);
540     if (sourceIter == mSources.end())
541         return (E_NON_EXISTENT); //not found!
542
543     asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
544     if ((work = mPool.startWork(worker)) == -1)
545     {
546         logError("AsyncRoutingSender::asyncSetSourceVolume not enough threads!");
547         delete worker;
548         return (E_NOT_POSSIBLE);
549     }
550
551     //save the handle related to the workerID
552     pthread_mutex_lock(&mMapHandleWorkerMutex);
553     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
554     pthread_mutex_unlock(&mMapHandleWorkerMutex);
555
556     return (E_OK);
557 }
558
559 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
560 {
561     assert(mReceiveInterface!=0);
562     assert(handle.handle!=0);
563     assert(handle.handleType==H_SETSOURCESTATE);
564     assert(sourceID!=0);
565
566     //check if we can take the job
567     am_Source_s source;
568     int16_t work = -1;
569
570     //find the source
571     pthread_mutex_lock(&mSourcesMutex);
572     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
573     for (; sourceIter != mSources.end(); ++sourceIter)
574     {
575         if (sourceIter->sourceID == sourceID)
576         {
577             source = *sourceIter;
578             break;
579         }
580     }
581     pthread_mutex_unlock(&mSourcesMutex);
582     if (sourceIter == mSources.end())
583         return (E_NON_EXISTENT); //not found!
584
585     asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
586     if ((work = mPool.startWork(worker)) == -1)
587     {
588         logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
589         delete worker;
590         return (E_NOT_POSSIBLE);
591     }
592
593     //save the handle related to the workerID
594     pthread_mutex_lock(&mMapHandleWorkerMutex);
595     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
596     pthread_mutex_unlock(&mMapHandleWorkerMutex);
597
598     return (E_OK);
599 }
600
601 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_sinkID_t sinkID, const am_SoundProperty_s & soundProperty)
602 {
603     assert(mReceiveInterface!=0);
604     assert(handle.handle!=0);
605     assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
606     assert(sinkID!=0);
607
608     //check if we can take the job
609     am_Sink_s sink;
610     int16_t work = -1;
611
612     //find the sink
613     pthread_mutex_lock(&mSinksMutex);
614     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
615     for (; sinkIter != mSinks.end(); ++sinkIter)
616     {
617         if (sinkIter->sinkID == sinkID)
618         {
619             sink = *sinkIter;
620             break;
621         }
622     }
623     pthread_mutex_unlock(&mSinksMutex);
624     if (sinkIter == mSinks.end())
625         return (E_NON_EXISTENT); //not found!
626
627     asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
628     if ((work = mPool.startWork(worker)) == -1)
629     {
630         logError("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!");
631         delete worker;
632         return (E_NOT_POSSIBLE);
633     }
634
635     //save the handle related to the workerID
636     pthread_mutex_lock(&mMapHandleWorkerMutex);
637     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
638     pthread_mutex_unlock(&mMapHandleWorkerMutex);
639
640     return (E_OK);
641 }
642
643 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
644 {
645     //todo: implement crossfader
646     (void) handle;
647     (void) crossfaderID;
648     (void) hotSink;
649     (void) rampType;
650     (void) time;
651     return E_NOT_USED;
652 }
653
654 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
655 {
656     assert(mReceiveInterface!=0);
657     assert(domainID!=0);
658
659     //check if we can take the job
660     am_Domain_s domain;
661     int16_t work = -1;
662
663     //find the sink
664     pthread_mutex_lock(&mDomainsMutex);
665     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
666     for (; domainIter != mDomains.end(); ++domainIter)
667     {
668         if (domainIter->domainID == domainID)
669         {
670             domain = *domainIter;
671             break;
672         }
673     }
674     pthread_mutex_unlock(&mDomainsMutex);
675     if (domainIter == mDomains.end())
676         return (E_NON_EXISTENT); //not found!
677
678     asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
679     if ((work = mPool.startWork(worker)) == -1)
680     {
681         logError("AsyncRoutingSender::setDomainState not enough threads!");
682         delete worker;
683         return (E_NOT_POSSIBLE);
684     }
685
686     return (E_OK);
687
688 }
689
690 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SoundProperty_s & soundProperty)
691 {
692     assert(mReceiveInterface!=0);
693     assert(handle.handle!=0);
694     assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
695     assert(sourceID!=0);
696
697     //check if we can take the job
698     am_Source_s source;
699     int16_t work = -1;
700
701     //find the source
702     pthread_mutex_lock(&mSourcesMutex);
703     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
704     for (; sourceIter != mSources.end(); ++sourceIter)
705     {
706         if (sourceIter->sourceID == sourceID)
707         {
708             source = *sourceIter;
709             break;
710         }
711     }
712     pthread_mutex_unlock(&mSourcesMutex);
713     if (sourceIter == mSources.end())
714         return (E_NON_EXISTENT); //not found!
715
716     asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
717     if ((work = mPool.startWork(worker)) == -1)
718     {
719         logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
720         delete worker;
721         return (E_NOT_POSSIBLE);
722     }
723
724     //save the handle related to the workerID
725     pthread_mutex_lock(&mMapHandleWorkerMutex);
726     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
727     pthread_mutex_unlock(&mMapHandleWorkerMutex);
728
729     return (E_OK);
730 }
731
732 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
733 {
734     BusName = "RoutingAsync";
735     return (E_OK);
736 }
737
738 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
739 {
740     //just write two domains into the table and return it
741     std::vector<am_Domain_s> table;
742     am_Domain_s item;
743     item.busname = "RoutingAsync";
744     item.domainID = 0;
745     item.early = false;
746     item.name = "AsyncDomain1";
747     item.nodename = "AsyncNode1";
748     item.state = DS_CONTROLLED;
749     table.push_back(item);
750     item.busname = "RoutingAsync";
751     item.domainID = 0;
752     item.early = false;
753     item.name = "AsyncDomain2";
754     item.nodename = "AsyncNode2";
755     item.state = DS_CONTROLLED;
756     table.push_back(item);
757     return (table);
758 }
759
760 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
761 {
762     //create a bunch full of sinks
763     std::vector<am_Sink_s> table;
764     am_Sink_s item;
765     am_SoundProperty_s sp;
766     sp.type = SP_BASS;
767     sp.value = 0;
768     for (int16_t i = 0; i <= 10; i++)
769     {
770         std::stringstream temp;
771         temp << i;
772         item.domainID = 0; //we cannot know this when the table is created !
773         item.name = "mySink" + temp.str();
774         item.sinkID = i; //take fixed ids to make thins easy
775         item.sinkClassID = 1;
776         item.volume = 0;
777         item.listSoundProperties.push_back(sp);
778         item.visible = true;
779         item.listConnectionFormats.push_back(CF_ANALOG);
780         table.push_back(item);
781     }
782     return (table);
783 }
784
785 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
786 {
787     //create a bunch full of sources
788     std::vector<am_Source_s> table;
789     am_Source_s item;
790     for (int16_t i = 0; i <= 10; i++)
791     {
792         std::stringstream temp;
793         temp << i;
794         item.domainID = 0; //we cannot know this when the table is created !
795         item.name = "mySource" + temp.str();
796         item.sourceID = i; //take fixed ids to make thins easy
797         item.sourceClassID = 1;
798         item.volume = 0;
799         item.visible = true;
800         item.listConnectionFormats.push_back(CF_ANALOG);
801         table.push_back(item);
802     }
803     return (table);
804 }
805
806 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
807 {
808     pthread_mutex_lock(&mMapConnectionMutex);
809     mMapConnectionIDRoute.insert(std::make_pair(connectionID, route));
810     pthread_mutex_unlock(&mMapConnectionMutex);
811 }
812
813 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
814 {
815     pthread_mutex_lock(&mMapHandleWorkerMutex);
816     if (mMapHandleWorker.erase(handle))
817     {
818         logError("AsyncRoutingSender::removeHandle could not remove handle");
819     }
820     pthread_mutex_unlock(&mMapHandleWorkerMutex);
821 }
822
823 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
824 {
825     pthread_mutex_lock(&mMapConnectionMutex);
826     if (mMapConnectionIDRoute.erase(connectionID))
827     {
828         logError("AsyncRoutingSender::removeConnectionSafe could not remove connection");
829     }
830     pthread_mutex_unlock(&mMapConnectionMutex);
831 }
832
833 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
834 {
835     pthread_mutex_lock(&mSinksMutex);
836     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
837     for (; sinkIter != mSinks.end(); ++sinkIter)
838     {
839         if (sinkIter->sinkID == sinkID)
840         {
841             sinkIter->volume = volume;
842             break;
843         }
844     }
845     pthread_mutex_unlock(&mSinksMutex);
846 }
847
848 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
849 {
850     pthread_mutex_lock(&mSourcesMutex);
851     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
852     for (; sourceIter != mSources.end(); ++sourceIter)
853     {
854         if (sourceIter->sourceID == sourceID)
855         {
856             sourceIter->volume = volume;
857             break;
858         }
859     }
860     pthread_mutex_unlock(&mSourcesMutex);
861 }
862
863 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
864 {
865     pthread_mutex_lock(&mSourcesMutex);
866     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
867     for (; sourceIter != mSources.end(); ++sourceIter)
868     {
869         if (sourceIter->sourceID == sourceID)
870         {
871             sourceIter->sourceState = state;
872             break;
873         }
874     }
875     pthread_mutex_unlock(&mSourcesMutex);
876 }
877
878 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
879 {
880     pthread_mutex_lock(&mSinksMutex);
881     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
882     for (; sinkIter != mSinks.end(); ++sinkIter)
883     {
884         if (sinkIter->sinkID == sinkID)
885         {
886             std::vector<am_SoundProperty_s>::iterator spIterator = sinkIter->listSoundProperties.begin();
887             for (; spIterator != sinkIter->listSoundProperties.end(); ++spIterator)
888             {
889                 if (spIterator->type == soundProperty.type)
890                 {
891                     spIterator->value = soundProperty.value;
892                     break;
893                 }
894             }
895         }
896     }
897     pthread_mutex_unlock(&mSinksMutex);
898 }
899
900 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
901 {
902     pthread_mutex_lock(&mSourcesMutex);
903     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
904     for (; sourceIter != mSources.end(); ++sourceIter)
905     {
906         if (sourceIter->sourceID == sourceID)
907         {
908             std::vector<am_SoundProperty_s>::iterator spIterator = sourceIter->listSoundProperties.begin();
909             for (; spIterator != sourceIter->listSoundProperties.end(); ++spIterator)
910             {
911                 if (spIterator->type == soundProperty.type)
912                 {
913                     spIterator->value = soundProperty.value;
914                     break;
915                 }
916             }
917         }
918     }
919     pthread_mutex_unlock(&mSourcesMutex);
920 }
921
922 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
923 {
924     pthread_mutex_lock(&mDomainsMutex);
925     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
926     for (; domainIter != mDomains.end(); ++domainIter)
927     {
928         if (domainIter->domainID == domainID)
929         {
930             domainIter->state = domainState;
931             break;
932         }
933     }
934     pthread_mutex_unlock(&mDomainsMutex);
935 }
936
937 uint16_t AsyncRoutingSender::getInterfaceVersion() const
938 {
939     return (RoutingSendVersion);
940 }
941
942 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperties(const am_Handle_s handle, const am_sourceID_t sourceID, const std::vector<am_SoundProperty_s> & listSoundProperties)
943 {
944     //todo: implement
945     (void) handle;
946     (void) sourceID;
947     (void) listSoundProperties;
948     return (E_NOT_USED);
949 }
950
951 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperties(const am_Handle_s handle, const am_sinkID_t sinkID, const std::vector<am_SoundProperty_s> & listSoundProperties)
952 {
953     //todo: implement
954     (void) handle;
955     (void) sinkID;
956     (void) listSoundProperties;
957     return (E_NOT_USED);
958 }
959
960 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
961 {
962     std::vector<am_Gateway_s> table;
963     am_Gateway_s item;
964     item.name = "myGateway";
965     item.sinkID = 2;
966     item.sourceID = 2;
967     table.push_back(item);
968     return (table);
969 }
970
971 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat) :
972         Worker(pool), //
973         mAsyncSender(asyncSender), //
974         mShadow(shadow), //
975         mHandle(handle), //
976         mConnectionID(connectionID), //
977         mSourceID(sourceID), //
978         mSinkID(sinkID), //
979         mConnectionFormat(connectionFormat)
980 {
981 }
982
983 void asycConnectWorker::start2work()
984 {
985     logInfo("Start connecting");
986     timespec t;
987     t.tv_nsec = 0;
988     t.tv_sec = 1;
989
990     //do something for one second
991     if (timedWait(t))
992         return;
993     am_RoutingElement_s route;
994     route.sinkID = mSinkID;
995     route.sourceID = mSourceID;
996     route.connectionFormat = mConnectionFormat;
997
998     //enter new connectionID into the list
999     mAsyncSender->insertConnectionSafe(mConnectionID, route);
1000
1001     //send the ack
1002     mShadow->ackConnect(mHandle, mConnectionID, E_OK);
1003
1004     //destroy the handle
1005     mAsyncSender->removeHandleSafe(mHandle.handle);
1006 }
1007
1008 void asycConnectWorker::cancelWork()
1009 {
1010     mAsyncSender->removeHandleSafe(mHandle.handle);
1011     mShadow->ackConnect(mHandle, mConnectionID, E_ABORTED);
1012 }
1013
1014 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID) :
1015         Worker(pool), //
1016         mAsyncSender(asyncSender), //
1017         mShadow(shadow), //
1018         mHandle(handle), //
1019         mConnectionID(connectionID)
1020 {
1021 }
1022
1023 void asycDisConnectWorker::start2work()
1024 {
1025     logInfo("Start disconnecting");
1026     timespec t;
1027     t.tv_nsec = 0;
1028     t.tv_sec = 1;
1029
1030     //do something for one second
1031     if (timedWait(t))
1032         return;
1033     am_RoutingElement_s route;
1034
1035     //enter new connectionID into the list
1036     mAsyncSender->insertConnectionSafe(mConnectionID, route);
1037
1038     //send the ack
1039     mShadow->ackDisconnect(mHandle, mConnectionID, E_OK);
1040
1041     //destroy the handle
1042     mAsyncSender->removeHandleSafe(mHandle.handle);
1043
1044 }
1045
1046 void asycDisConnectWorker::cancelWork()
1047 {
1048     mAsyncSender->removeHandleSafe(mHandle.handle);
1049     mShadow->ackDisconnect(mHandle, mConnectionID, E_ABORTED);
1050 }
1051
1052 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1053         Worker(pool), //
1054         mAsyncSender(asyncSender), //
1055         mShadow(shadow), //
1056         mCurrentVolume(currentVolume), //
1057         mHandle(handle), //
1058         mSinkID(sinkID), //
1059         mVolume(volume), //
1060         mRamp(ramp), //
1061         mTime(time)
1062 {
1063 }
1064
1065 void asyncSetSinkVolumeWorker::start2work()
1066 {
1067     //todo: this implementation does not respect time and ramp....
1068     logInfo("Start setting sink volume");
1069     timespec t;
1070     t.tv_nsec = 10000000;
1071     t.tv_sec = 0;
1072
1073     while (mCurrentVolume != mVolume)
1074     {
1075         if (mCurrentVolume < mVolume)
1076             mCurrentVolume++;
1077         else
1078             mCurrentVolume--;
1079         mShadow->ackSinkVolumeTick(mHandle, mSinkID, mCurrentVolume);
1080         if (timedWait(t))
1081             return;
1082     }
1083
1084     //enter new connectionID into the list
1085     mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1086
1087     //send the ack
1088     mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_OK);
1089
1090     //destroy the handle
1091     mAsyncSender->removeHandleSafe(mHandle.handle);
1092 }
1093
1094 void asyncSetSinkVolumeWorker::cancelWork()
1095 {
1096     mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1097     mAsyncSender->removeHandleSafe(mHandle.handle);
1098     mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1099 }
1100
1101 asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1102         Worker(pool), //
1103         mAsyncSender(asyncSender), //
1104         mShadow(shadow), //
1105         mCurrentVolume(currentVolume), //
1106         mHandle(handle), //
1107         mSourceID(SourceID), //
1108         mVolume(volume), //
1109         mRamp(ramp), //
1110         mTime(time)
1111 {
1112 }
1113
1114 void asyncSetSourceVolumeWorker::start2work()
1115 {
1116     //todo: this implementation does not respect time and ramp....
1117     logInfo("Start setting source volume");
1118     timespec t;
1119     t.tv_nsec = 10000000;
1120     t.tv_sec = 0;
1121
1122     while (mCurrentVolume != mVolume)
1123     {
1124         if (mCurrentVolume < mVolume)
1125             mCurrentVolume++;
1126         else
1127             mCurrentVolume--;
1128         mShadow->ackSourceVolumeTick(mHandle, mSourceID, mCurrentVolume);
1129         if (timedWait(t))
1130             return;
1131     }
1132
1133     //enter new connectionID into the list
1134     mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1135
1136     //send the ack
1137     mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_OK);
1138
1139     //destroy the handle
1140     mAsyncSender->removeHandleSafe(mHandle.handle);
1141 }
1142
1143 void asyncSetSourceVolumeWorker::cancelWork()
1144 {
1145     mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1146     mAsyncSender->removeHandleSafe(mHandle.handle);
1147     mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1148 }
1149
1150 asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state) :
1151         Worker(pool), //
1152         mAsyncSender(asyncSender), //
1153         mShadow(shadow), //
1154         mHandle(handle), //
1155         mSourceID(sourceID), //
1156         mSourcestate(state)
1157 {
1158 }
1159
1160 void asyncSetSourceStateWorker::start2work()
1161 {
1162     logInfo("Start setting source state");
1163     timespec t;
1164     t.tv_nsec = 0;
1165     t.tv_sec = 1;
1166
1167     //do something for one second
1168     if (timedWait(t))
1169         return;
1170
1171     //enter new connectionID into the list
1172     mAsyncSender->updateSourceStateSafe(mSourceID, mSourcestate);
1173
1174     //send the ack
1175     mShadow->ackSetSourceState(mHandle, E_OK);
1176
1177     //destroy the handle
1178     mAsyncSender->removeHandleSafe(mHandle.handle);
1179 }
1180
1181 void asyncSetSourceStateWorker::cancelWork()
1182 {
1183     //send the ack
1184     mShadow->ackSetSourceState(mHandle, E_ABORTED);
1185
1186     //destroy the handle
1187     mAsyncSender->removeHandleSafe(mHandle.handle);
1188 }
1189
1190 asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID) :
1191         Worker(pool), //
1192         mAsyncSender(asyncSender), //
1193         mShadow(shadow), //
1194         mHandle(handle), //
1195         mSinkID(sinkID), //
1196         mSoundProperty(soundProperty)
1197 {
1198 }
1199
1200 void asyncSetSinkSoundPropertyWorker::start2work()
1201 {
1202     logInfo("Start setting sink sound property");
1203     timespec t;
1204     t.tv_nsec = 0;
1205     t.tv_sec = 1;
1206
1207     //do something for one second
1208     if (timedWait(t))
1209         return;
1210
1211     //enter new connectionID into the list
1212     mAsyncSender->updateSinkSoundPropertySafe(mSinkID, mSoundProperty);
1213
1214     //send the ack
1215     mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1216
1217     //destroy the handle
1218     mAsyncSender->removeHandleSafe(mHandle.handle);
1219 }
1220
1221 void asyncSetSinkSoundPropertyWorker::cancelWork()
1222 {
1223     //send the ack
1224     mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1225
1226     //destroy the handle
1227     mAsyncSender->removeHandleSafe(mHandle.handle);
1228 }
1229
1230 asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID) :
1231         Worker(pool), //
1232         mAsyncSender(asyncSender), //
1233         mShadow(shadow), //
1234         mHandle(handle), //
1235         mSourceID(sourceID), //
1236         mSoundProperty(soundProperty)
1237 {
1238 }
1239
1240 void asyncSetSourceSoundPropertyWorker::start2work()
1241 {
1242     logInfo("Start setting source sound property");
1243     timespec t;
1244     t.tv_nsec = 0;
1245     t.tv_sec = 1;
1246
1247     //do something for one second
1248     if (timedWait(t))
1249         return;
1250
1251     //enter new connectionID into the list
1252     mAsyncSender->updateSourceSoundPropertySafe(mSourceID, mSoundProperty);
1253
1254     //send the ack
1255     mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1256
1257     //destroy the handle
1258     mAsyncSender->removeHandleSafe(mHandle.handle);
1259 }
1260
1261 void asyncSetSourceSoundPropertyWorker::cancelWork()
1262 {
1263     //send the ack
1264     mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1265
1266     //destroy the handle
1267     mAsyncSender->removeHandleSafe(mHandle.handle);
1268 }
1269
1270 asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState) :
1271         Worker(pool), //
1272         mAsyncSender(asyncSender), //
1273         mShadow(shadow), //
1274         mDomainID(domainID), //
1275         mDomainState(domainState)
1276 {
1277 }
1278
1279 void asyncDomainStateChangeWorker::start2work()
1280 {
1281     //todo: sendchanged data must be in here !
1282     logInfo("Start setting source sound property");
1283     timespec t;
1284     t.tv_nsec = 0;
1285     t.tv_sec = 1;
1286
1287     //do something for one second
1288     if (timedWait(t))
1289         return;
1290
1291     //enter new connectionID into the list
1292     mAsyncSender->updateDomainstateSafe(mDomainID, mDomainState);
1293     mShadow->hookDomainStateChange(mDomainID, mDomainState);
1294     //send the new status
1295
1296 }
1297
1298 void am::asyncDomainStateChangeWorker::cancelWork()
1299 {
1300     //send the new status
1301     mShadow->hookDomainStateChange(mDomainID, mDomainState);
1302 }
1303