* formatting all the source code with eclipse source code style
[profile/ivi/genivi/genivi-audio-manager.git] / PluginRoutingInterfaceAsync / src / RoutingSenderAsync.cpp
1 /**
2  * Copyright (C) 2011, BMW AG
3  *
4  * GeniviAudioMananger DbusPlugin
5  *
6  * \file RoutingSender.cpp
7  *
8  * \date 20-Oct-2011 3:42:04 PM
9  * \author Christian Mueller (christian.ei.mueller@bmw.de)
10  *
11  * \section License
12  * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13  * Copyright (C) 2011, BMW AG Christian Mueller  Christian.ei.mueller@bmw.de
14  *
15  * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17  * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18  * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19  * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20  * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21  * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
22  *
23  */
24
25 #include "RoutingSenderAsyn.h"
26 #include "DltContext.h"
27 #include <algorithm>
28 #include <vector>
29 #include <poll.h>
30 #include <errno.h>
31 #include <time.h>
32 #include <assert.h>
33 #include <sstream>
34 #include <string>
35 #include <dbus/dbus.h>
36
37 using namespace am;
38
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
40
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory()
42 {
43     return (new AsyncRoutingSender());
44 }
45
46 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface)
47 {
48     delete routingSendInterface;
49 }
50
51 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
55 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
56 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
57
58 void* AsyncRoutingSender::InterruptEvents(void *data)
59 {
60     //This is a very very very basic implementation of the dbus interface
61     //there is not failure handling, nothing.
62     //it is used just for testing, not intended to be used otherwise...
63     RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
64     DBusError err;
65     DBusMessage* msg;
66     DBusConnection* conn;
67     dbus_error_init(&err);
68     conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
69     dbus_uint32_t serial = 0;
70     DBusMessage* reply;
71     DBusMessageIter args;
72     int answer = dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
73
74     while (dbus_connection_read_write_dispatch(conn, -1))
75     {
76         dbus_connection_read_write(conn, 0);
77         msg = dbus_connection_pop_message(conn);
78
79         if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
80         {
81             am_connectionID_t connectionID;
82             am_timeSync_t delay;
83             dbus_message_iter_init(msg, &args);
84             dbus_message_iter_get_basic(&args,(void*) &connectionID);
85             dbus_message_iter_next(&args);
86             dbus_message_iter_get_basic(&args,(void*) &delay);
87             reply = dbus_message_new_method_return(msg);
88             dbus_message_iter_init_append(reply, &args);
89             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
90             dbus_connection_send(conn, reply, &serial);
91             shadow->hookTimingInformationChanged(connectionID,delay);
92             dbus_message_unref(reply);
93         }
94         else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
95         {
96             am_sinkID_t sinkID;
97             am_timeSync_t delay;
98             am_Availability_s availability;
99             dbus_message_iter_init(msg, &args);
100             dbus_message_iter_get_basic(&args,(void*) &sinkID);
101             reply = dbus_message_new_method_return(msg);
102             dbus_message_iter_init_append(reply, &args);
103             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
104             dbus_connection_send(conn, reply, &serial);
105             shadow->hookSinkAvailablityStatusChange(sinkID,availability);
106             dbus_message_unref(reply);
107         }
108         else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
109         {
110             am_sourceID_t sourceID;
111             am_timeSync_t delay;
112             am_Availability_s availability;
113             dbus_message_iter_init(msg, &args);
114             dbus_message_iter_get_basic(&args,(void*) &sourceID);
115             reply = dbus_message_new_method_return(msg);
116             dbus_message_iter_init_append(reply, &args);
117             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
118             dbus_connection_send(conn, reply, &serial);
119             shadow->hookSourceAvailablityStatusChange(sourceID,availability);
120             dbus_message_unref(reply);
121         }
122         else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
123         {
124             am_sourceID_t sourceID;
125
126             am_InterruptState_e state;
127             dbus_message_iter_init(msg, &args);
128             dbus_message_iter_get_basic(&args,(void*) &sourceID);
129             reply = dbus_message_new_method_return(msg);
130             dbus_message_iter_init_append(reply, &args);
131             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
132             dbus_connection_send(conn, reply, &serial);
133             shadow->hookInterruptStatusChange(sourceID,state);
134             dbus_message_unref(reply);
135         }
136         dbus_connection_flush(conn);
137     }
138 }
139
140 void *WorkerThreadPool::WorkerThread(void* data)
141 {
142     threadInfo_s *myInfo=(threadInfo_s*)data;
143     while (1)
144     {
145         sem_wait(&myInfo->block);
146         pthread_mutex_lock(&mBlockingMutex);
147         Worker* actWorker=myInfo->worker;
148         pthread_mutex_unlock(&mBlockingMutex);
149         actWorker->setCancelSempaphore(&myInfo->cancel);
150         actWorker->start2work();
151         actWorker->pPool->finishedWork(myInfo->threadID);
152     }
153 }
154
155 WorkerThreadPool::WorkerThreadPool(int numThreads):
156         mNumThreads(numThreads)
157 {
158     int workerID=0;
159     mListWorkers.resize(mNumThreads);
160     for (int i=0;i<mNumThreads;i++)
161     {
162         sem_init(&mListWorkers[i].block,NULL,NULL);
163         sem_init(&mListWorkers[i].cancel,NULL,NULL);
164         mListWorkers[i].busy=false;
165         mListWorkers[i].workerID=++workerID;
166         pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
167     }
168 }
169
170 int16_t WorkerThreadPool::startWork(Worker *worker)
171 {
172     pthread_mutex_lock(&mBlockingMutex);
173     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
174     for(;it!=mListWorkers.end();++it)
175     {
176         if(!it->busy)
177         {
178             it->worker=worker;
179             it->busy=true;
180             pthread_mutex_unlock(&mBlockingMutex);
181             sem_post(&it->block);
182             return ((int)it->workerID);
183         }
184     }
185     pthread_mutex_unlock(&mBlockingMutex);
186     return (-1);
187 }
188
189 bool WorkerThreadPool::cancelWork(int workerID)
190 {
191     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
192     for(;it!=mListWorkers.end();++it)
193     {
194         if(it->workerID==workerID && it->busy)
195         {
196             sem_post(&it->cancel);
197             return (true);
198         }
199     }
200     return (false);
201 }
202
203 void WorkerThreadPool::finishedWork(pthread_t threadID)
204 {
205     pthread_mutex_lock(&mBlockingMutex);
206     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
207     for(;it!=mListWorkers.end();++it)
208     {
209         if(it->threadID==threadID)
210         {
211             it->busy=false;
212             delete it->worker;
213             break;
214         }
215     }
216     pthread_mutex_unlock(&mBlockingMutex);
217 }
218
219 WorkerThreadPool::~WorkerThreadPool()
220 {
221     for (int i=0;i<mNumThreads;i++)
222     {
223         pthread_cancel(mListWorkers[i].threadID);
224     }
225     }
226
227 Worker::Worker(WorkerThreadPool *pool):
228     pPool(pool), //
229     mCancelSem()
230 {
231 }
232
233 void Worker::setCancelSempaphore(sem_t* cancel)
234 {
235     mCancelSem=cancel;
236 }
237
238 bool Worker::timedWait(timespec timer)
239 {
240     timespec temp;
241     if(clock_gettime(0, &temp)==-1)
242     {
243         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait error on getting time"));
244     }
245     temp.tv_nsec+=timer.tv_nsec;
246     temp.tv_sec+=timer.tv_sec;
247     //if(sem_wait(mCancelSem)==-1)
248     if (sem_timedwait(mCancelSem,&temp)==-1)
249     {
250         //a timeout happened
251         if(errno == ETIMEDOUT)
252         {
253             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait timeout waiting error"));
254             return (false);
255         }
256         else //failure in waiting, nevertheless, we quit the thread...
257         {
258             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
259             return (true);
260         }
261     }
262     DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
263     this->cancelWork();
264     return (true);
265 }
266
267 AsyncRoutingSender::AsyncRoutingSender():
268     mShadow(), //
269     mReceiveInterface(0), //
270     mDomains(createDomainTable()), //
271     mSinks(createSinkTable()), //
272     mSources ( createSourceTable ( ) ), //
273     mGateways ( createGatewayTable ( ) ) , //
274     mMapHandleWorker ( ), //
275     mMapConnectionIDRoute(),//
276     mPool(10)
277 {
278     DLT_REGISTER_CONTEXT(PluginRoutingAsync,"ASY","Async Plugin");
279     DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("AsyncRoutingSender constructed"));
280 }
281
282 AsyncRoutingSender::~AsyncRoutingSender()
283 {
284 }
285
286 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
287 {
288     //first, create the Shadow:
289     assert(routingreceiveinterface!=0);
290     mReceiveInterface = routingreceiveinterface;
291     mShadow.setRoutingInterface(routingreceiveinterface);
292 }
293
294 void AsyncRoutingSender::routingInterfacesReady()
295 {
296     assert(mReceiveInterface!=0);
297     am_Error_e eCode;
298     //first register the domains
299     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
300     for (; domainIter != mDomains.end(); ++domainIter)
301     {
302         am_domainID_t domainID;
303         if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
304         {
305             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with"), DLT_INT(eCode));
306         }
307         domainIter->domainID = domainID;
308     }
309
310     //then sources
311     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
312     for (; sourceIter != mSources.end(); ++sourceIter)
313     {
314         am_sourceID_t sourceID;
315         //set the correct domainID
316         sourceIter->domainID = mDomains[0].domainID;
317         if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
318         {
319             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with"), DLT_INT(eCode));
320         }
321     }
322
323     //sinks
324     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
325     for (; sinkIter != mSinks.end(); ++sinkIter)
326     {
327         am_sinkID_t sinkID;
328         //set the correct domainID
329         sinkIter->domainID = mDomains[0].domainID;
330         if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
331         {
332             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with"), DLT_INT(eCode));
333         }
334     }
335
336     //gateways
337 //      std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
338 //      for(;gatewayIter!=mGateways.end();++gatewayIter)
339 //      {
340 //              am_gatewayID_t gatewayID;
341 //              gatewayIter->domainSinkID=mDomains[0].domainID;
342 //              gatewayIter->domainSourceID=mDomains[1].domainID;
343 //              gatewayIter->controlDomainID=mDomains[0].domainID;
344 //              if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
345 //              {
346 //                      DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with"), DLT_INT(eCode));
347 //              }
348 //              gatewayIter->gatewayID=gatewayID;
349 //      }
350
351     //create thread for interrupts, but only if we are testing - otherwise we get 100% cpu load:
352     //todo: find a solution for the 100% dbus load to uncomment this and make interrupt tests work
353     //pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
354 }
355
356 void AsyncRoutingSender::routingInterfacesRundown()
357 {
358     assert(mReceiveInterface!=0);
359 }
360
361 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
362 {
363     assert(mReceiveInterface!=0);
364     assert(handle.handle!=0);
365
366     //first check if we know the handle
367     pthread_mutex_lock(&mMapHandleWorkerMutex);
368     std::map<uint16_t, int16_t>::iterator iter = mMapHandleWorker.begin();
369     if (mMapHandleWorker.find(handle.handle) == mMapHandleWorker.end())
370     {
371         pthread_mutex_unlock(&mMapHandleWorkerMutex);
372         return (E_NON_EXISTENT);
373     }
374     pthread_mutex_unlock(&mMapHandleWorkerMutex);
375
376     //ok, cancel the action:
377     if (mPool.cancelWork(iter->second)) return (E_OK);
378     return (E_UNKNOWN);
379 }
380
381 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
382 {
383     assert(mReceiveInterface!=0);
384     assert(handle.handle!=0);
385     assert(handle.handleType==H_CONNECT);
386     assert(connectionID!=0);
387     assert(sinkID!=0);
388     assert(sourceID!=0);
389
390     //check if we can take the job
391     am_Sink_s sink;
392     am_Source_s source;
393     int16_t work = -1;
394
395     //find the sink
396     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
397     for (; sinkIter != mSinks.end(); ++sinkIter)
398     {
399         if (sinkIter->sinkID == sinkID)
400         {
401             sink = *sinkIter;
402             break;
403         }
404     }
405     if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found!
406
407     //find the source
408     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
409     for (; sourceIter != mSources.end(); ++sourceIter)
410     {
411         if (sourceIter->sourceID == sourceID)
412         {
413             source = *sourceIter;
414             break;
415         }
416     }
417     if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
418
419     //check the format
420     if (std::find(source.listConnectionFormats.begin(), source.listConnectionFormats.end(), connectionFormat) == source.listConnectionFormats.end()) return (E_WRONG_FORMAT);
421     if (std::find(sink.listConnectionFormats.begin(), sink.listConnectionFormats.end(), connectionFormat) == sink.listConnectionFormats.end()) return (E_WRONG_FORMAT);
422
423     //the operation is ok, lets create a worker, assign it to a task in the task pool
424     asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
425     if ((work = mPool.startWork(worker)) == -1)
426     {
427         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncConnect not enough threads!"));
428         delete worker;
429         return (E_NOT_POSSIBLE);
430     }
431
432     //save the handle related to the workerID
433     pthread_mutex_lock(&mMapHandleWorkerMutex);
434     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
435     pthread_mutex_unlock(&mMapHandleWorkerMutex);
436
437     return (E_OK);
438 }
439
440 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
441 {
442     assert(mReceiveInterface!=0);
443     assert(handle.handle!=0);
444     assert(handle.handleType==H_DISCONNECT);
445     assert(connectionID!=0);
446
447     //check if we can take the job
448     int16_t work = -1;
449
450     pthread_mutex_lock(&mMapConnectionMutex);
451     if (mMapConnectionIDRoute.find(connectionID) == mMapConnectionIDRoute.end())
452     {
453         pthread_mutex_unlock(&mMapConnectionMutex);
454         return (E_NON_EXISTENT);
455     }
456     pthread_mutex_unlock(&mMapConnectionMutex);
457
458     //the operation is ok, lets create a worker, assign it to a task in the task pool
459     asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
460     if ((work = mPool.startWork(worker)) == -1)
461     {
462         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncDisconnect not enough threads!"));
463         delete worker;
464         return (E_NOT_POSSIBLE);
465     }
466
467     //save the handle related to the workerID
468     pthread_mutex_lock(&mMapHandleWorkerMutex);
469     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
470     pthread_mutex_unlock(&mMapHandleWorkerMutex);
471
472     return (E_OK);
473 }
474
475 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
476 {
477     assert(mReceiveInterface!=0);
478     assert(handle.handle!=0);
479     assert(handle.handleType==H_SETSINKVOLUME);
480     assert(sinkID!=0);
481
482     //check if we can take the job
483     am_Sink_s sink;
484     int16_t work = -1;
485
486     //find the sink
487     pthread_mutex_lock(&mSinksMutex);
488     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
489     for (; sinkIter != mSinks.end(); ++sinkIter)
490     {
491         if (sinkIter->sinkID == sinkID)
492         {
493             sink = *sinkIter;
494             break;
495         }
496     }
497     pthread_mutex_unlock(&mSinksMutex);
498     if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found!
499
500     asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
501     if ((work = mPool.startWork(worker)) == -1)
502     {
503         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"));
504         delete worker;
505         return (E_NOT_POSSIBLE);
506     }
507
508     //save the handle related to the workerID
509     pthread_mutex_lock(&mMapHandleWorkerMutex);
510     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
511     pthread_mutex_unlock(&mMapHandleWorkerMutex);
512
513     return (E_OK);
514 }
515
516 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
517 {
518     assert(mReceiveInterface!=0);
519     assert(handle.handle!=0);
520     assert(handle.handleType==H_SETSOURCEVOLUME);
521     assert(sourceID!=0);
522
523     //check if we can take the job
524     am_Source_s source;
525     int16_t work = -1;
526
527     //find the sink
528     pthread_mutex_lock(&mSourcesMutex);
529     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
530     for (; sourceIter != mSources.end(); ++sourceIter)
531     {
532         if (sourceIter->sourceID == sourceID)
533         {
534             source = *sourceIter;
535             break;
536         }
537     }
538     pthread_mutex_unlock(&mSourcesMutex);
539     if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
540
541     asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
542     if ((work = mPool.startWork(worker)) == -1)
543     {
544         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"));
545         delete worker;
546         return (E_NOT_POSSIBLE);
547     }
548
549     //save the handle related to the workerID
550     pthread_mutex_lock(&mMapHandleWorkerMutex);
551     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
552     pthread_mutex_unlock(&mMapHandleWorkerMutex);
553
554     return (E_OK);
555 }
556
557 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
558 {
559     assert(mReceiveInterface!=0);
560     assert(handle.handle!=0);
561     assert(handle.handleType==H_SETSOURCESTATE);
562     assert(sourceID!=0);
563
564     //check if we can take the job
565     am_Source_s source;
566     int16_t work = -1;
567
568     //find the source
569     pthread_mutex_lock(&mSourcesMutex);
570     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
571     for (; sourceIter != mSources.end(); ++sourceIter)
572     {
573         if (sourceIter->sourceID == sourceID)
574         {
575             source = *sourceIter;
576             break;
577         }
578     }
579     pthread_mutex_unlock(&mSourcesMutex);
580     if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
581
582     asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
583     if ((work = mPool.startWork(worker)) == -1)
584     {
585         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
586         delete worker;
587         return (E_NOT_POSSIBLE);
588     }
589
590     //save the handle related to the workerID
591     pthread_mutex_lock(&mMapHandleWorkerMutex);
592     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
593     pthread_mutex_unlock(&mMapHandleWorkerMutex);
594
595     return (E_OK);
596 }
597
598 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_SoundProperty_s& soundProperty, const am_sinkID_t sinkID)
599 {
600     assert(mReceiveInterface!=0);
601     assert(handle.handle!=0);
602     assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
603     assert(sinkID!=0);
604
605     //check if we can take the job
606     am_Sink_s sink;
607     int16_t work = -1;
608
609     //find the sink
610     pthread_mutex_lock(&mSinksMutex);
611     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
612     for (; sinkIter != mSinks.end(); ++sinkIter)
613     {
614         if (sinkIter->sinkID == sinkID)
615         {
616             sink = *sinkIter;
617             break;
618         }
619     }
620     pthread_mutex_unlock(&mSinksMutex);
621     if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found!
622
623     asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
624     if ((work = mPool.startWork(worker)) == -1)
625     {
626         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"));
627         delete worker;
628         return (E_NOT_POSSIBLE);
629     }
630
631     //save the handle related to the workerID
632     pthread_mutex_lock(&mMapHandleWorkerMutex);
633     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
634     pthread_mutex_unlock(&mMapHandleWorkerMutex);
635
636     return (E_OK);
637 }
638
639 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
640 {
641     //todo: implement crossfader
642     return E_NOT_USED;
643 }
644
645 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
646 {
647     assert(mReceiveInterface!=0);
648     assert(domainID!=0);
649
650     //check if we can take the job
651     am_Domain_s domain;
652     int16_t work = -1;
653
654     //find the sink
655     pthread_mutex_lock(&mDomainsMutex);
656     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
657     for (; domainIter != mDomains.end(); ++domainIter)
658     {
659         if (domainIter->domainID == domainID)
660         {
661             domain = *domainIter;
662             break;
663         }
664     }
665     pthread_mutex_unlock(&mDomainsMutex);
666     if (domainIter == mDomains.end()) return (E_NON_EXISTENT); //not found!
667
668     asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
669     if ((work = mPool.startWork(worker)) == -1)
670     {
671         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::setDomainState not enough threads!"));
672         delete worker;
673         return (E_NOT_POSSIBLE);
674     }
675
676     return (E_OK);
677
678 }
679
680 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_SoundProperty_s & soundProperty, const am_sourceID_t sourceID)
681 {
682     assert(mReceiveInterface!=0);
683     assert(handle.handle!=0);
684     assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
685     assert(sourceID!=0);
686
687     //check if we can take the job
688     am_Source_s source;
689     int16_t work = -1;
690
691     //find the source
692     pthread_mutex_lock(&mSourcesMutex);
693     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
694     for (; sourceIter != mSources.end(); ++sourceIter)
695     {
696         if (sourceIter->sourceID == sourceID)
697         {
698             source = *sourceIter;
699             break;
700         }
701     }
702     pthread_mutex_unlock(&mSourcesMutex);
703     if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
704
705     asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
706     if ((work = mPool.startWork(worker)) == -1)
707     {
708         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
709         delete worker;
710         return (E_NOT_POSSIBLE);
711     }
712
713     //save the handle related to the workerID
714     pthread_mutex_lock(&mMapHandleWorkerMutex);
715     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
716     pthread_mutex_unlock(&mMapHandleWorkerMutex);
717
718     return (E_OK);
719 }
720
721 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
722 {
723     BusName = "RoutingAsync";
724     return (E_OK);
725 }
726
727 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
728 {
729     //just write two domains into the table and return it
730     std::vector<am_Domain_s> table;
731     am_Domain_s item;
732     item.busname = "RoutingAsync";
733     item.domainID = 0;
734     item.early = false;
735     item.name = "AsyncDomain1";
736     item.nodename = "AsyncNode1";
737     item.state = DS_CONTROLLED;
738     table.push_back(item);
739     item.busname = "RoutingAsync";
740     item.domainID = 0;
741     item.early = false;
742     item.name = "AsyncDomain2";
743     item.nodename = "AsyncNode2";
744     item.state = DS_CONTROLLED;
745     table.push_back(item);
746     return (table);
747 }
748
749 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
750 {
751     //create a bunch full of sinks
752     std::vector<am_Sink_s> table;
753     am_Sink_s item;
754     am_SoundProperty_s sp;
755     sp.type = SP_BASS;
756     sp.value = 0;
757     for (int16_t i = 0; i <= 10; i++)
758     {
759         std::stringstream temp;
760         temp << i;
761         item.domainID = 0;     //we cannot know this when the table is created !
762         item.name = "mySink" + temp.str();
763         item.sinkID = i;                 //take fixed ids to make thins easy
764         item.sinkClassID = 1;
765         item.volume = 0;
766         item.listSoundProperties.push_back(sp);
767         item.visible = true;
768         item.listConnectionFormats.push_back(CF_ANALOG);
769         table.push_back(item);
770     }
771     return (table);
772 }
773
774 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
775 {
776     //create a bunch full of sources
777     std::vector<am_Source_s> table;
778     am_Source_s item;
779     for (int16_t i = 0; i <= 10; i++)
780     {
781         std::stringstream temp;
782         temp << i;
783         item.domainID = 0;     //we cannot know this when the table is created !
784         item.name = "mySource" + temp.str();
785         item.sourceID = i;               //take fixed ids to make thins easy
786         item.sourceClassID = 1;
787         item.volume = 0;
788         item.visible = true;
789         item.listConnectionFormats.push_back(CF_ANALOG);
790         table.push_back(item);
791     }
792     return (table);
793 }
794
795 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
796 {
797     pthread_mutex_lock(&mMapConnectionMutex);
798     mMapConnectionIDRoute.insert(std::make_pair(connectionID, route));
799     pthread_mutex_unlock(&mMapConnectionMutex);
800 }
801
802 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
803 {
804     pthread_mutex_lock(&mMapHandleWorkerMutex);
805     if (mMapHandleWorker.erase(handle))
806     {
807         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeHandle could not remove handle"));
808     }
809     pthread_mutex_unlock(&mMapHandleWorkerMutex);
810 }
811
812 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
813 {
814     pthread_mutex_lock(&mMapConnectionMutex);
815     if (mMapConnectionIDRoute.erase(connectionID))
816     {
817         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeConnectionSafe could not remove connection"));
818     }
819     pthread_mutex_unlock(&mMapConnectionMutex);
820 }
821
822 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
823 {
824     pthread_mutex_lock(&mSinksMutex);
825     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
826     for (; sinkIter != mSinks.end(); ++sinkIter)
827     {
828         if (sinkIter->sinkID == sinkID)
829         {
830             sinkIter->volume = volume;
831             break;
832         }
833     }
834     pthread_mutex_unlock(&mSinksMutex);
835 }
836
837 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
838 {
839     pthread_mutex_lock(&mSourcesMutex);
840     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
841     for (; sourceIter != mSources.end(); ++sourceIter)
842     {
843         if (sourceIter->sourceID == sourceID)
844         {
845             sourceIter->volume = volume;
846             break;
847         }
848     }
849     pthread_mutex_unlock(&mSourcesMutex);
850 }
851
852 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
853 {
854     pthread_mutex_lock(&mSourcesMutex);
855     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
856     for (; sourceIter != mSources.end(); ++sourceIter)
857     {
858         if (sourceIter->sourceID == sourceID)
859         {
860             sourceIter->sourceState = state;
861             break;
862         }
863     }
864     pthread_mutex_unlock(&mSourcesMutex);
865 }
866
867 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
868 {
869     pthread_mutex_lock(&mSinksMutex);
870     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
871     for (; sinkIter != mSinks.end(); ++sinkIter)
872     {
873         if (sinkIter->sinkID == sinkID)
874         {
875             std::vector<am_SoundProperty_s>::iterator spIterator = sinkIter->listSoundProperties.begin();
876             for (; spIterator != sinkIter->listSoundProperties.end(); ++spIterator)
877             {
878                 if (spIterator->type == soundProperty.type)
879                 {
880                     spIterator->value = soundProperty.value;
881                     break;
882                 }
883             }
884         }
885     }
886     pthread_mutex_unlock(&mSinksMutex);
887 }
888
889 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
890 {
891     pthread_mutex_lock(&mSourcesMutex);
892     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
893     for (; sourceIter != mSources.end(); ++sourceIter)
894     {
895         if (sourceIter->sourceID == sourceID)
896         {
897             std::vector<am_SoundProperty_s>::iterator spIterator = sourceIter->listSoundProperties.begin();
898             for (; spIterator != sourceIter->listSoundProperties.end(); ++spIterator)
899             {
900                 if (spIterator->type == soundProperty.type)
901                 {
902                     spIterator->value = soundProperty.value;
903                     break;
904                 }
905             }
906         }
907     }
908     pthread_mutex_unlock(&mSourcesMutex);
909 }
910
911 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
912 {
913     pthread_mutex_lock(&mDomainsMutex);
914     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
915     for (; domainIter != mDomains.end(); ++domainIter)
916     {
917         if (domainIter->domainID == domainID)
918         {
919             domainIter->state = domainState;
920             break;
921         }
922     }
923     pthread_mutex_unlock(&mDomainsMutex);
924 }
925
926 uint16_t AsyncRoutingSender::getInterfaceVersion() const
927 {
928     return (RoutingSendVersion);
929 }
930
931 am_Error_e am::AsyncRoutingSender::asyncSetSinkSoundProperties(const am_Handle_s handle, const std::vector<am_SoundProperty_s> & listSoundProperties, const am_sinkID_t sinkID)
932 {
933     //todo: implement
934 }
935
936 am_Error_e am::AsyncRoutingSender::asyncSetSourceSoundProperties(const am_Handle_s handle, const std::vector<am_SoundProperty_s> & listSoundProperties, const am_sourceID_t sourceID)
937 {
938     //todo: implement
939 }
940
941 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
942 {
943     std::vector<am_Gateway_s> table;
944     am_Gateway_s item;
945     item.name = "myGateway";
946     item.sinkID = 2;
947     item.sourceID = 2;
948     table.push_back(item);
949     return (table);
950 }
951
952 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat) :
953         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(handle), mConnectionID(connectionID), mSourceID(sourceID), mSinkID(sinkID), mConnectionFormat(connectionFormat)
954 {
955 }
956
957 void asycConnectWorker::start2work()
958 {
959     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start connecting"));
960     timespec t;
961     t.tv_nsec = 0;
962     t.tv_sec = 1;
963
964     //do something for one second
965     if (timedWait(t)) return;
966     am_RoutingElement_s route;
967     route.sinkID = mSinkID;
968     route.sourceID = mSourceID;
969     route.connectionFormat = mConnectionFormat;
970
971     //enter new connectionID into the list
972     mAsyncSender->insertConnectionSafe(mConnectionID, route);
973
974     //send the ack
975     mShadow->ackConnect(mHandle, mConnectionID, E_OK);
976
977     //destroy the handle
978     mAsyncSender->removeHandleSafe(mHandle.handle);
979 }
980
981 void asycConnectWorker::cancelWork()
982 {
983     mAsyncSender->removeHandleSafe(mHandle.handle);
984     mShadow->ackConnect(mHandle, mConnectionID, E_ABORTED);
985 }
986
987 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID) :
988         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(handle), mConnectionID(connectionID)
989 {
990 }
991
992 void asycDisConnectWorker::start2work()
993 {
994     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start disconnecting"));
995     timespec t;
996     t.tv_nsec = 0;
997     t.tv_sec = 1;
998
999     //do something for one second
1000     if (timedWait(t)) return;
1001     am_RoutingElement_s route;
1002
1003     //enter new connectionID into the list
1004     mAsyncSender->insertConnectionSafe(mConnectionID, route);
1005
1006     //send the ack
1007     mShadow->ackDisconnect(mHandle, mConnectionID, E_OK);
1008
1009     //destroy the handle
1010     mAsyncSender->removeHandleSafe(mHandle.handle);
1011
1012 }
1013
1014 void asycDisConnectWorker::cancelWork()
1015 {
1016     mAsyncSender->removeHandleSafe(mHandle.handle);
1017     mShadow->ackDisconnect(mHandle, mConnectionID, E_ABORTED);
1018 }
1019
1020 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1021         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mCurrentVolume(currentVolume), mHandle(handle), mSinkID(sinkID), mVolume(volume), mRamp(ramp), mTime(time)
1022 {
1023 }
1024
1025 void asyncSetSinkVolumeWorker::start2work()
1026 {
1027     //todo: this implementation does not respect time and ramp....
1028     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink volume"));
1029     timespec t;
1030     t.tv_nsec = 10000000;
1031     t.tv_sec = 0;
1032
1033     while (mCurrentVolume != mVolume)
1034     {
1035         if (mCurrentVolume < mVolume)
1036             mCurrentVolume++;
1037         else
1038             mCurrentVolume--;
1039         mShadow->ackSinkVolumeTick(mHandle, mSinkID, mCurrentVolume);
1040         if (timedWait(t)) return;
1041     }
1042
1043     //enter new connectionID into the list
1044     mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1045
1046     //send the ack
1047     mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_OK);
1048
1049     //destroy the handle
1050     mAsyncSender->removeHandleSafe(mHandle.handle);
1051 }
1052
1053 void asyncSetSinkVolumeWorker::cancelWork()
1054 {
1055     mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1056     mAsyncSender->removeHandleSafe(mHandle.handle);
1057     mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1058 }
1059
1060 am::asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1061         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mCurrentVolume(currentVolume), mHandle(handle), mSourceID(SourceID), mVolume(volume), mRamp(ramp), mTime(time)
1062 {
1063 }
1064
1065 void am::asyncSetSourceVolumeWorker::start2work()
1066 {
1067     //todo: this implementation does not respect time and ramp....
1068     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source volume"));
1069     timespec t;
1070     t.tv_nsec = 10000000;
1071     t.tv_sec = 0;
1072
1073     while (mCurrentVolume != mVolume)
1074     {
1075         if (mCurrentVolume < mVolume)
1076             mCurrentVolume++;
1077         else
1078             mCurrentVolume--;
1079         mShadow->ackSourceVolumeTick(mHandle, mSourceID, mCurrentVolume);
1080         if (timedWait(t)) return;
1081     }
1082
1083     //enter new connectionID into the list
1084     mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1085
1086     //send the ack
1087     mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_OK);
1088
1089     //destroy the handle
1090     mAsyncSender->removeHandleSafe(mHandle.handle);
1091 }
1092
1093 void am::asyncSetSourceVolumeWorker::cancelWork()
1094 {
1095     mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1096     mAsyncSender->removeHandleSafe(mHandle.handle);
1097     mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1098 }
1099
1100 am::asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state) :
1101         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(handle), mSourceID(sourceID), mSourcestate(state)
1102 {
1103 }
1104
1105 void am::asyncSetSourceStateWorker::start2work()
1106 {
1107     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source state"));
1108     timespec t;
1109     t.tv_nsec = 0;
1110     t.tv_sec = 1;
1111
1112     //do something for one second
1113     if (timedWait(t)) return;
1114
1115     //enter new connectionID into the list
1116     mAsyncSender->updateSourceStateSafe(mSourceID, mSourcestate);
1117
1118     //send the ack
1119     mShadow->ackSetSourceState(mHandle, E_OK);
1120
1121     //destroy the handle
1122     mAsyncSender->removeHandleSafe(mHandle.handle);
1123 }
1124
1125 void am::asyncSetSourceStateWorker::cancelWork()
1126 {
1127     //send the ack
1128     mShadow->ackSetSourceState(mHandle, E_ABORTED);
1129
1130     //destroy the handle
1131     mAsyncSender->removeHandleSafe(mHandle.handle);
1132 }
1133
1134 am::asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID) :
1135         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(), mSinkID(sinkID), mSoundProperty(soundProperty)
1136 {
1137 }
1138
1139 void am::asyncSetSinkSoundPropertyWorker::start2work()
1140 {
1141     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink sound property"));
1142     timespec t;
1143     t.tv_nsec = 0;
1144     t.tv_sec = 1;
1145
1146     //do something for one second
1147     if (timedWait(t)) return;
1148
1149     //enter new connectionID into the list
1150     mAsyncSender->updateSinkSoundPropertySafe(mSinkID, mSoundProperty);
1151
1152     //send the ack
1153     mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1154
1155     //destroy the handle
1156     mAsyncSender->removeHandleSafe(mHandle.handle);
1157 }
1158
1159 void am::asyncSetSinkSoundPropertyWorker::cancelWork()
1160 {
1161     //send the ack
1162     mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1163
1164     //destroy the handle
1165     mAsyncSender->removeHandleSafe(mHandle.handle);
1166 }
1167
1168 am::asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID) :
1169         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(), mSourceID(sourceID), mSoundProperty(soundProperty)
1170 {
1171 }
1172
1173 void am::asyncSetSourceSoundPropertyWorker::start2work()
1174 {
1175     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1176     timespec t;
1177     t.tv_nsec = 0;
1178     t.tv_sec = 1;
1179
1180     //do something for one second
1181     if (timedWait(t)) return;
1182
1183     //enter new connectionID into the list
1184     mAsyncSender->updateSourceSoundPropertySafe(mSourceID, mSoundProperty);
1185
1186     //send the ack
1187     mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1188
1189     //destroy the handle
1190     mAsyncSender->removeHandleSafe(mHandle.handle);
1191 }
1192
1193 void am::asyncSetSourceSoundPropertyWorker::cancelWork()
1194 {
1195     //send the ack
1196     mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1197
1198     //destroy the handle
1199     mAsyncSender->removeHandleSafe(mHandle.handle);
1200 }
1201
1202 am::asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState) :
1203         Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mDomainID(domainID), mDomainState(domainState)
1204 {
1205 }
1206
1207 void am::asyncDomainStateChangeWorker::start2work()
1208 {
1209     //todo: sendchanged data must be in here !
1210     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1211     timespec t;
1212     t.tv_nsec = 0;
1213     t.tv_sec = 1;
1214
1215     //do something for one second
1216     if (timedWait(t)) return;
1217
1218     //enter new connectionID into the list
1219     mAsyncSender->updateDomainstateSafe(mDomainID, mDomainState);
1220     mShadow->hookDomainStateChange(mDomainID, mDomainState);
1221     //send the new status
1222
1223 }
1224
1225 void am::asyncDomainStateChangeWorker::cancelWork()
1226 {
1227     //send the new status
1228     mShadow->hookDomainStateChange(mDomainID, mDomainState);
1229 }
1230