9a394226b2a5b89d0ad524db219fa11adb7219f4
[profile/ivi/audiomanager.git] / PluginRoutingInterfaceAsync / src / RoutingSenderAsync.cpp
1 /**
2  * Copyright (C) 2011, BMW AG
3  *
4  * GeniviAudioMananger DbusPlugin
5  *
6  * \file RoutingSender.cpp
7  *
8  * \date 20-Oct-2011 3:42:04 PM
9  * \author Christian Mueller (christian.ei.mueller@bmw.de)
10  *
11  * \section License
12  * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13  * Copyright (C) 2011, BMW AG Christian Mueller  Christian.ei.mueller@bmw.de
14  *
15  * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17  * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18  * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19  * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20  * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21  * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
22  *
23  */
24
25 #include "RoutingSenderAsyn.h"
26 #include "DltContext.h"
27 #include <algorithm>
28 #include <vector>
29 #include <poll.h>
30 #include <errno.h>
31 #include <time.h>
32 #include <assert.h>
33 #include <sstream>
34 #include <string>
35 #include <dbus/dbus.h>
36
37 using namespace am;
38
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
40
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory()
42 {
43     return (new AsyncRoutingSender());
44 }
45
46 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface)
47 {
48     delete routingSendInterface;
49 }
50
51 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
55 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
56 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
57
58 void* AsyncRoutingSender::InterruptEvents(void *data)
59 {
60     RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
61     DBusError err;
62     DBusMessage* msg;
63     DBusConnection* conn;
64     dbus_error_init(&err);
65     conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
66     dbus_uint32_t serial = 0;
67     DBusMessage* reply;
68     DBusMessageIter args;
69     dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
70
71     while (dbus_connection_read_write_dispatch(conn, -1))
72     {
73         dbus_connection_read_write(conn, 0);
74         msg = dbus_connection_pop_message(conn);
75
76         if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
77         {
78             am_connectionID_t connectionID;
79             am_timeSync_t delay;
80             dbus_message_iter_init(msg, &args);
81             dbus_message_iter_get_basic(&args,(void*) &connectionID);
82             dbus_message_iter_next(&args);
83             dbus_message_iter_get_basic(&args,(void*) &delay);
84             reply = dbus_message_new_method_return(msg);
85             dbus_message_iter_init_append(reply, &args);
86             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
87             dbus_connection_send(conn, reply, &serial);
88             shadow->hookTimingInformationChanged(connectionID,delay);
89             dbus_message_unref(reply);
90         }
91         else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
92         {
93             am_sinkID_t sinkID;
94             am_Availability_s availability;
95             dbus_message_iter_init(msg, &args);
96             dbus_message_iter_get_basic(&args,(void*) &sinkID);
97             reply = dbus_message_new_method_return(msg);
98             dbus_message_iter_init_append(reply, &args);
99             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
100             dbus_connection_send(conn, reply, &serial);
101             shadow->hookSinkAvailablityStatusChange(sinkID,availability);
102             dbus_message_unref(reply);
103         }
104         else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
105         {
106             am_sourceID_t sourceID;
107             am_Availability_s availability;
108             dbus_message_iter_init(msg, &args);
109             dbus_message_iter_get_basic(&args,(void*) &sourceID);
110             reply = dbus_message_new_method_return(msg);
111             dbus_message_iter_init_append(reply, &args);
112             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
113             dbus_connection_send(conn, reply, &serial);
114             shadow->hookSourceAvailablityStatusChange(sourceID,availability);
115             dbus_message_unref(reply);
116         }
117         else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
118         {
119             am_sourceID_t sourceID;
120
121             am_InterruptState_e state=IS_MIN;
122             dbus_message_iter_init(msg, &args);
123             dbus_message_iter_get_basic(&args,(void*) &sourceID);
124             reply = dbus_message_new_method_return(msg);
125             dbus_message_iter_init_append(reply, &args);
126             dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
127             dbus_connection_send(conn, reply, &serial);
128             shadow->hookInterruptStatusChange(sourceID,state);
129             dbus_message_unref(reply);
130         }
131         dbus_connection_flush(conn);
132     }
133     return NULL;
134 }
135
136 void *WorkerThreadPool::WorkerThread(void* data)
137 {
138     threadInfo_s *myInfo=(threadInfo_s*)data;
139     while (1)
140     {
141         sem_wait(&myInfo->block);
142         pthread_mutex_lock(&mBlockingMutex);
143         Worker* actWorker=myInfo->worker;
144         pthread_mutex_unlock(&mBlockingMutex);
145         actWorker->setCancelSempaphore(&myInfo->cancel);
146         actWorker->start2work();
147         actWorker->pPool->finishedWork(myInfo->threadID);
148     }
149     return NULL;
150 }
151
152 WorkerThreadPool::WorkerThreadPool(int numThreads):
153 mNumThreads(numThreads)
154 {
155     int workerID=0;
156     mListWorkers.resize(mNumThreads);
157     for (int i=0;i<mNumThreads;i++)
158     {
159         sem_init(&mListWorkers[i].block,NULL,NULL);
160         sem_init(&mListWorkers[i].cancel,NULL,NULL);
161         mListWorkers[i].busy=false;
162         mListWorkers[i].workerID=++workerID;
163         pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
164     }
165 }
166
167 int16_t WorkerThreadPool::startWork(Worker *worker)
168 {
169     pthread_mutex_lock(&mBlockingMutex);
170     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
171     for(;it!=mListWorkers.end();++it)
172     {
173         if(!it->busy)
174         {
175             it->worker=worker;
176             it->busy=true;
177             pthread_mutex_unlock(&mBlockingMutex);
178             sem_post(&it->block);
179             return ((int)it->workerID);
180         }
181     }
182     pthread_mutex_unlock(&mBlockingMutex);
183     return (-1);
184 }
185
186 bool WorkerThreadPool::cancelWork(int workerID)
187 {
188     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
189     for(;it!=mListWorkers.end();++it)
190     {
191         if(it->workerID==workerID && it->busy)
192         {
193             sem_post(&it->cancel);
194             return (true);
195         }
196     }
197     return (false);
198 }
199
200 void WorkerThreadPool::finishedWork(pthread_t threadID)
201 {
202     pthread_mutex_lock(&mBlockingMutex);
203     std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
204     for(;it!=mListWorkers.end();++it)
205     {
206         if(it->threadID==threadID)
207         {
208             it->busy=false;
209             delete it->worker;
210             break;
211         }
212     }
213     pthread_mutex_unlock(&mBlockingMutex);
214 }
215
216 WorkerThreadPool::~WorkerThreadPool()
217 {
218     for (int i=0;i<mNumThreads;i++)
219     {
220         pthread_cancel(mListWorkers[i].threadID);
221     }
222 }
223
224 Worker::Worker(WorkerThreadPool *pool):
225 pPool(pool), //
226         mCancelSem()
227         {
228         }
229
230         void Worker::setCancelSempaphore(sem_t* cancel)
231         {
232             mCancelSem=cancel;
233         }
234
235         bool Worker::timedWait(timespec timer)
236         {
237             timespec temp;
238             if(clock_gettime(0, &temp)==-1)
239             {
240                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait error on getting time"));
241             }
242             temp.tv_nsec+=timer.tv_nsec;
243             temp.tv_sec+=timer.tv_sec;
244             //if(sem_wait(mCancelSem)==-1)
245         if (sem_timedwait(mCancelSem,&temp)==-1)
246         {
247             //a timeout happened
248             if(errno == ETIMEDOUT)
249             {
250                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait timeout waiting error"));
251                 return (false);
252             }
253             else //failure in waiting, nevertheless, we quit the thread...
254             {
255                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
256                 return (true);
257             }
258         }
259         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
260         this->cancelWork();
261         return (true);
262     }
263
264     AsyncRoutingSender::AsyncRoutingSender():
265     mShadow(), //
266     mReceiveInterface(0), //
267         mDomains(createDomainTable()), //
268         mSinks(createSinkTable()), //
269 mSources ( createSourceTable ( ) ), //
270 mGateways ( createGatewayTable ( ) ) , //
271 mMapHandleWorker ( ), //
272 mMapConnectionIDRoute(),//
273 mPool(10)
274 {
275     DLT_REGISTER_CONTEXT(PluginRoutingAsync,"ASY","Async Plugin");
276     DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("AsyncRoutingSender constructed"));
277 }
278
279 AsyncRoutingSender::~AsyncRoutingSender()
280 {
281 }
282
283 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
284 {
285     //first, create the Shadow:
286     assert(routingreceiveinterface!=0);
287     mReceiveInterface = routingreceiveinterface;
288     mShadow.setRoutingInterface(routingreceiveinterface);
289 }
290
291 void AsyncRoutingSender::routingInterfacesReady()
292 {
293     assert(mReceiveInterface!=0);
294     am_Error_e eCode;
295     //first register the domains
296     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
297     for (; domainIter != mDomains.end(); ++domainIter)
298     {
299         am_domainID_t domainID;
300         if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
301         {
302             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with"), DLT_INT(eCode));
303         }
304         domainIter->domainID = domainID;
305     }
306
307     //then sources
308     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
309     for (; sourceIter != mSources.end(); ++sourceIter)
310     {
311         am_sourceID_t sourceID;
312         //set the correct domainID
313         sourceIter->domainID = mDomains[0].domainID;
314         if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
315         {
316             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with"), DLT_INT(eCode));
317         }
318     }
319
320     //sinks
321     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
322     for (; sinkIter != mSinks.end(); ++sinkIter)
323     {
324         am_sinkID_t sinkID;
325         //set the correct domainID
326         sinkIter->domainID = mDomains[0].domainID;
327         if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
328         {
329             DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with"), DLT_INT(eCode));
330         }
331     }
332
333     //gateways
334 //      std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
335 //      for(;gatewayIter!=mGateways.end();++gatewayIter)
336 //      {
337 //              am_gatewayID_t gatewayID;
338 //              gatewayIter->domainSinkID=mDomains[0].domainID;
339 //              gatewayIter->domainSourceID=mDomains[1].domainID;
340 //              gatewayIter->controlDomainID=mDomains[0].domainID;
341 //              if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
342 //              {
343 //                      DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with"), DLT_INT(eCode));
344 //              }
345 //              gatewayIter->gatewayID=gatewayID;
346 //      }
347
348     //create thread for interrupts, but only if we are testing - otherwise we get 100% cpu load:
349     //todo: find a solution for the 100% dbus load to uncomment this and make interrupt tests work
350     //pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
351 }
352
353 void AsyncRoutingSender::routingInterfacesRundown()
354 {
355     assert(mReceiveInterface!=0);
356 }
357
358 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
359 {
360     assert(mReceiveInterface!=0);
361     assert(handle.handle!=0);
362
363     //first check if we know the handle
364     pthread_mutex_lock(&mMapHandleWorkerMutex);
365     std::map<uint16_t, int16_t>::iterator iter = mMapHandleWorker.begin();
366     if (mMapHandleWorker.find(handle.handle) == mMapHandleWorker.end())
367     {
368         pthread_mutex_unlock(&mMapHandleWorkerMutex);
369         return (E_NON_EXISTENT);
370     }
371     pthread_mutex_unlock(&mMapHandleWorkerMutex);
372
373     //ok, cancel the action:
374     if (mPool.cancelWork(iter->second))
375         return (E_OK);
376     return (E_UNKNOWN);
377 }
378
379 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
380 {
381     assert(mReceiveInterface!=0);
382     assert(handle.handle!=0);
383     assert(handle.handleType==H_CONNECT);
384     assert(connectionID!=0);
385     assert(sinkID!=0);
386     assert(sourceID!=0);
387
388     //check if we can take the job
389     am_Sink_s sink;
390     am_Source_s source;
391     int16_t work = -1;
392
393     //find the sink
394     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
395     for (; sinkIter != mSinks.end(); ++sinkIter)
396     {
397         if (sinkIter->sinkID == sinkID)
398         {
399             sink = *sinkIter;
400             break;
401         }
402     }
403     if (sinkIter == mSinks.end())
404         return (E_NON_EXISTENT); //not found!
405
406     //find the source
407     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
408     for (; sourceIter != mSources.end(); ++sourceIter)
409     {
410         if (sourceIter->sourceID == sourceID)
411         {
412             source = *sourceIter;
413             break;
414         }
415     }
416     if (sourceIter == mSources.end())
417         return (E_NON_EXISTENT); //not found!
418
419     //check the format
420     if (std::find(source.listConnectionFormats.begin(), source.listConnectionFormats.end(), connectionFormat) == source.listConnectionFormats.end())
421         return (E_WRONG_FORMAT);
422     if (std::find(sink.listConnectionFormats.begin(), sink.listConnectionFormats.end(), connectionFormat) == sink.listConnectionFormats.end())
423         return (E_WRONG_FORMAT);
424
425     //the operation is ok, lets create a worker, assign it to a task in the task pool
426     asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
427     if ((work = mPool.startWork(worker)) == -1)
428     {
429         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncConnect not enough threads!"));
430         delete worker;
431         return (E_NOT_POSSIBLE);
432     }
433
434     //save the handle related to the workerID
435     pthread_mutex_lock(&mMapHandleWorkerMutex);
436     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
437     pthread_mutex_unlock(&mMapHandleWorkerMutex);
438
439     return (E_OK);
440 }
441
442 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
443 {
444     assert(mReceiveInterface!=0);
445     assert(handle.handle!=0);
446     assert(handle.handleType==H_DISCONNECT);
447     assert(connectionID!=0);
448
449     //check if we can take the job
450     int16_t work = -1;
451
452     pthread_mutex_lock(&mMapConnectionMutex);
453     if (mMapConnectionIDRoute.find(connectionID) == mMapConnectionIDRoute.end())
454     {
455         pthread_mutex_unlock(&mMapConnectionMutex);
456         return (E_NON_EXISTENT);
457     }
458     pthread_mutex_unlock(&mMapConnectionMutex);
459
460     //the operation is ok, lets create a worker, assign it to a task in the task pool
461     asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
462     if ((work = mPool.startWork(worker)) == -1)
463     {
464         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncDisconnect not enough threads!"));
465         delete worker;
466         return (E_NOT_POSSIBLE);
467     }
468
469     //save the handle related to the workerID
470     pthread_mutex_lock(&mMapHandleWorkerMutex);
471     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
472     pthread_mutex_unlock(&mMapHandleWorkerMutex);
473
474     return (E_OK);
475 }
476
477 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
478 {
479     assert(mReceiveInterface!=0);
480     assert(handle.handle!=0);
481     assert(handle.handleType==H_SETSINKVOLUME);
482     assert(sinkID!=0);
483
484     //check if we can take the job
485     am_Sink_s sink;
486     int16_t work = -1;
487
488     //find the sink
489     pthread_mutex_lock(&mSinksMutex);
490     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
491     for (; sinkIter != mSinks.end(); ++sinkIter)
492     {
493         if (sinkIter->sinkID == sinkID)
494         {
495             sink = *sinkIter;
496             break;
497         }
498     }
499     pthread_mutex_unlock(&mSinksMutex);
500     if (sinkIter == mSinks.end())
501         return (E_NON_EXISTENT); //not found!
502
503     asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
504     if ((work = mPool.startWork(worker)) == -1)
505     {
506         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"));
507         delete worker;
508         return (E_NOT_POSSIBLE);
509     }
510
511     //save the handle related to the workerID
512     pthread_mutex_lock(&mMapHandleWorkerMutex);
513     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
514     pthread_mutex_unlock(&mMapHandleWorkerMutex);
515
516     return (E_OK);
517 }
518
519 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
520 {
521     assert(mReceiveInterface!=0);
522     assert(handle.handle!=0);
523     assert(handle.handleType==H_SETSOURCEVOLUME);
524     assert(sourceID!=0);
525
526     //check if we can take the job
527     am_Source_s source;
528     int16_t work = -1;
529
530     //find the sink
531     pthread_mutex_lock(&mSourcesMutex);
532     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
533     for (; sourceIter != mSources.end(); ++sourceIter)
534     {
535         if (sourceIter->sourceID == sourceID)
536         {
537             source = *sourceIter;
538             break;
539         }
540     }
541     pthread_mutex_unlock(&mSourcesMutex);
542     if (sourceIter == mSources.end())
543         return (E_NON_EXISTENT); //not found!
544
545     asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
546     if ((work = mPool.startWork(worker)) == -1)
547     {
548         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"));
549         delete worker;
550         return (E_NOT_POSSIBLE);
551     }
552
553     //save the handle related to the workerID
554     pthread_mutex_lock(&mMapHandleWorkerMutex);
555     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
556     pthread_mutex_unlock(&mMapHandleWorkerMutex);
557
558     return (E_OK);
559 }
560
561 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
562 {
563     assert(mReceiveInterface!=0);
564     assert(handle.handle!=0);
565     assert(handle.handleType==H_SETSOURCESTATE);
566     assert(sourceID!=0);
567
568     //check if we can take the job
569     am_Source_s source;
570     int16_t work = -1;
571
572     //find the source
573     pthread_mutex_lock(&mSourcesMutex);
574     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
575     for (; sourceIter != mSources.end(); ++sourceIter)
576     {
577         if (sourceIter->sourceID == sourceID)
578         {
579             source = *sourceIter;
580             break;
581         }
582     }
583     pthread_mutex_unlock(&mSourcesMutex);
584     if (sourceIter == mSources.end())
585         return (E_NON_EXISTENT); //not found!
586
587     asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
588     if ((work = mPool.startWork(worker)) == -1)
589     {
590         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
591         delete worker;
592         return (E_NOT_POSSIBLE);
593     }
594
595     //save the handle related to the workerID
596     pthread_mutex_lock(&mMapHandleWorkerMutex);
597     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
598     pthread_mutex_unlock(&mMapHandleWorkerMutex);
599
600     return (E_OK);
601 }
602
603 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_sinkID_t sinkID, const am_SoundProperty_s & soundProperty)
604 {
605     assert(mReceiveInterface!=0);
606     assert(handle.handle!=0);
607     assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
608     assert(sinkID!=0);
609
610     //check if we can take the job
611     am_Sink_s sink;
612     int16_t work = -1;
613
614     //find the sink
615     pthread_mutex_lock(&mSinksMutex);
616     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
617     for (; sinkIter != mSinks.end(); ++sinkIter)
618     {
619         if (sinkIter->sinkID == sinkID)
620         {
621             sink = *sinkIter;
622             break;
623         }
624     }
625     pthread_mutex_unlock(&mSinksMutex);
626     if (sinkIter == mSinks.end())
627         return (E_NON_EXISTENT); //not found!
628
629     asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
630     if ((work = mPool.startWork(worker)) == -1)
631     {
632         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"));
633         delete worker;
634         return (E_NOT_POSSIBLE);
635     }
636
637     //save the handle related to the workerID
638     pthread_mutex_lock(&mMapHandleWorkerMutex);
639     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
640     pthread_mutex_unlock(&mMapHandleWorkerMutex);
641
642     return (E_OK);
643 }
644
645 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
646 {
647     //todo: implement crossfader
648     (void) handle;
649     (void) crossfaderID;
650     (void) hotSink;
651     (void) rampType;
652     (void) time;
653     return E_NOT_USED;
654 }
655
656 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
657 {
658     assert(mReceiveInterface!=0);
659     assert(domainID!=0);
660
661     //check if we can take the job
662     am_Domain_s domain;
663     int16_t work = -1;
664
665     //find the sink
666     pthread_mutex_lock(&mDomainsMutex);
667     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
668     for (; domainIter != mDomains.end(); ++domainIter)
669     {
670         if (domainIter->domainID == domainID)
671         {
672             domain = *domainIter;
673             break;
674         }
675     }
676     pthread_mutex_unlock(&mDomainsMutex);
677     if (domainIter == mDomains.end())
678         return (E_NON_EXISTENT); //not found!
679
680     asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
681     if ((work = mPool.startWork(worker)) == -1)
682     {
683         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::setDomainState not enough threads!"));
684         delete worker;
685         return (E_NOT_POSSIBLE);
686     }
687
688     return (E_OK);
689
690 }
691
692 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SoundProperty_s & soundProperty)
693 {
694     assert(mReceiveInterface!=0);
695     assert(handle.handle!=0);
696     assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
697     assert(sourceID!=0);
698
699     //check if we can take the job
700     am_Source_s source;
701     int16_t work = -1;
702
703     //find the source
704     pthread_mutex_lock(&mSourcesMutex);
705     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
706     for (; sourceIter != mSources.end(); ++sourceIter)
707     {
708         if (sourceIter->sourceID == sourceID)
709         {
710             source = *sourceIter;
711             break;
712         }
713     }
714     pthread_mutex_unlock(&mSourcesMutex);
715     if (sourceIter == mSources.end())
716         return (E_NON_EXISTENT); //not found!
717
718     asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
719     if ((work = mPool.startWork(worker)) == -1)
720     {
721         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
722         delete worker;
723         return (E_NOT_POSSIBLE);
724     }
725
726     //save the handle related to the workerID
727     pthread_mutex_lock(&mMapHandleWorkerMutex);
728     mMapHandleWorker.insert(std::make_pair(handle.handle, work));
729     pthread_mutex_unlock(&mMapHandleWorkerMutex);
730
731     return (E_OK);
732 }
733
734 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
735 {
736     BusName = "RoutingAsync";
737     return (E_OK);
738 }
739
740 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
741 {
742     //just write two domains into the table and return it
743     std::vector<am_Domain_s> table;
744     am_Domain_s item;
745     item.busname = "RoutingAsync";
746     item.domainID = 0;
747     item.early = false;
748     item.name = "AsyncDomain1";
749     item.nodename = "AsyncNode1";
750     item.state = DS_CONTROLLED;
751     table.push_back(item);
752     item.busname = "RoutingAsync";
753     item.domainID = 0;
754     item.early = false;
755     item.name = "AsyncDomain2";
756     item.nodename = "AsyncNode2";
757     item.state = DS_CONTROLLED;
758     table.push_back(item);
759     return (table);
760 }
761
762 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
763 {
764     //create a bunch full of sinks
765     std::vector<am_Sink_s> table;
766     am_Sink_s item;
767     am_SoundProperty_s sp;
768     sp.type = SP_BASS;
769     sp.value = 0;
770     for (int16_t i = 0; i <= 10; i++)
771     {
772         std::stringstream temp;
773         temp << i;
774         item.domainID = 0; //we cannot know this when the table is created !
775         item.name = "mySink" + temp.str();
776         item.sinkID = i; //take fixed ids to make thins easy
777         item.sinkClassID = 1;
778         item.volume = 0;
779         item.listSoundProperties.push_back(sp);
780         item.visible = true;
781         item.listConnectionFormats.push_back(CF_ANALOG);
782         table.push_back(item);
783     }
784     return (table);
785 }
786
787 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
788 {
789     //create a bunch full of sources
790     std::vector<am_Source_s> table;
791     am_Source_s item;
792     for (int16_t i = 0; i <= 10; i++)
793     {
794         std::stringstream temp;
795         temp << i;
796         item.domainID = 0; //we cannot know this when the table is created !
797         item.name = "mySource" + temp.str();
798         item.sourceID = i; //take fixed ids to make thins easy
799         item.sourceClassID = 1;
800         item.volume = 0;
801         item.visible = true;
802         item.listConnectionFormats.push_back(CF_ANALOG);
803         table.push_back(item);
804     }
805     return (table);
806 }
807
808 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
809 {
810     pthread_mutex_lock(&mMapConnectionMutex);
811     mMapConnectionIDRoute.insert(std::make_pair(connectionID, route));
812     pthread_mutex_unlock(&mMapConnectionMutex);
813 }
814
815 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
816 {
817     pthread_mutex_lock(&mMapHandleWorkerMutex);
818     if (mMapHandleWorker.erase(handle))
819     {
820         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeHandle could not remove handle"));
821     }
822     pthread_mutex_unlock(&mMapHandleWorkerMutex);
823 }
824
825 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
826 {
827     pthread_mutex_lock(&mMapConnectionMutex);
828     if (mMapConnectionIDRoute.erase(connectionID))
829     {
830         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeConnectionSafe could not remove connection"));
831     }
832     pthread_mutex_unlock(&mMapConnectionMutex);
833 }
834
835 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
836 {
837     pthread_mutex_lock(&mSinksMutex);
838     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
839     for (; sinkIter != mSinks.end(); ++sinkIter)
840     {
841         if (sinkIter->sinkID == sinkID)
842         {
843             sinkIter->volume = volume;
844             break;
845         }
846     }
847     pthread_mutex_unlock(&mSinksMutex);
848 }
849
850 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
851 {
852     pthread_mutex_lock(&mSourcesMutex);
853     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
854     for (; sourceIter != mSources.end(); ++sourceIter)
855     {
856         if (sourceIter->sourceID == sourceID)
857         {
858             sourceIter->volume = volume;
859             break;
860         }
861     }
862     pthread_mutex_unlock(&mSourcesMutex);
863 }
864
865 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
866 {
867     pthread_mutex_lock(&mSourcesMutex);
868     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
869     for (; sourceIter != mSources.end(); ++sourceIter)
870     {
871         if (sourceIter->sourceID == sourceID)
872         {
873             sourceIter->sourceState = state;
874             break;
875         }
876     }
877     pthread_mutex_unlock(&mSourcesMutex);
878 }
879
880 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
881 {
882     pthread_mutex_lock(&mSinksMutex);
883     std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
884     for (; sinkIter != mSinks.end(); ++sinkIter)
885     {
886         if (sinkIter->sinkID == sinkID)
887         {
888             std::vector<am_SoundProperty_s>::iterator spIterator = sinkIter->listSoundProperties.begin();
889             for (; spIterator != sinkIter->listSoundProperties.end(); ++spIterator)
890             {
891                 if (spIterator->type == soundProperty.type)
892                 {
893                     spIterator->value = soundProperty.value;
894                     break;
895                 }
896             }
897         }
898     }
899     pthread_mutex_unlock(&mSinksMutex);
900 }
901
902 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
903 {
904     pthread_mutex_lock(&mSourcesMutex);
905     std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
906     for (; sourceIter != mSources.end(); ++sourceIter)
907     {
908         if (sourceIter->sourceID == sourceID)
909         {
910             std::vector<am_SoundProperty_s>::iterator spIterator = sourceIter->listSoundProperties.begin();
911             for (; spIterator != sourceIter->listSoundProperties.end(); ++spIterator)
912             {
913                 if (spIterator->type == soundProperty.type)
914                 {
915                     spIterator->value = soundProperty.value;
916                     break;
917                 }
918             }
919         }
920     }
921     pthread_mutex_unlock(&mSourcesMutex);
922 }
923
924 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
925 {
926     pthread_mutex_lock(&mDomainsMutex);
927     std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
928     for (; domainIter != mDomains.end(); ++domainIter)
929     {
930         if (domainIter->domainID == domainID)
931         {
932             domainIter->state = domainState;
933             break;
934         }
935     }
936     pthread_mutex_unlock(&mDomainsMutex);
937 }
938
939 uint16_t AsyncRoutingSender::getInterfaceVersion() const
940 {
941     return (RoutingSendVersion);
942 }
943
944 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperties(const am_Handle_s handle, const am_sourceID_t sourceID, const std::vector<am_SoundProperty_s> & listSoundProperties)
945 {
946     //todo: implement
947     (void) handle;
948     (void) sourceID;
949     (void) listSoundProperties;
950     return (E_NOT_USED);
951 }
952
953 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperties(const am_Handle_s handle, const am_sinkID_t sinkID, const std::vector<am_SoundProperty_s> & listSoundProperties)
954 {
955     //todo: implement
956     (void) handle;
957     (void) sinkID;
958     (void) listSoundProperties;
959     return (E_NOT_USED);
960 }
961
962 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
963 {
964     std::vector<am_Gateway_s> table;
965     am_Gateway_s item;
966     item.name = "myGateway";
967     item.sinkID = 2;
968     item.sourceID = 2;
969     table.push_back(item);
970     return (table);
971 }
972
973 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat) :
974         Worker(pool), //
975         mAsyncSender(asyncSender), //
976         mShadow(shadow), //
977         mHandle(handle), //
978         mConnectionID(connectionID), //
979         mSourceID(sourceID), //
980         mSinkID(sinkID), //
981         mConnectionFormat(connectionFormat)
982 {
983 }
984
985 void asycConnectWorker::start2work()
986 {
987     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start connecting"));
988     timespec t;
989     t.tv_nsec = 0;
990     t.tv_sec = 1;
991
992     //do something for one second
993     if (timedWait(t))
994         return;
995     am_RoutingElement_s route;
996     route.sinkID = mSinkID;
997     route.sourceID = mSourceID;
998     route.connectionFormat = mConnectionFormat;
999
1000     //enter new connectionID into the list
1001     mAsyncSender->insertConnectionSafe(mConnectionID, route);
1002
1003     //send the ack
1004     mShadow->ackConnect(mHandle, mConnectionID, E_OK);
1005
1006     //destroy the handle
1007     mAsyncSender->removeHandleSafe(mHandle.handle);
1008 }
1009
1010 void asycConnectWorker::cancelWork()
1011 {
1012     mAsyncSender->removeHandleSafe(mHandle.handle);
1013     mShadow->ackConnect(mHandle, mConnectionID, E_ABORTED);
1014 }
1015
1016 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID) :
1017         Worker(pool), //
1018         mAsyncSender(asyncSender), //
1019         mShadow(shadow), //
1020         mHandle(handle), //
1021         mConnectionID(connectionID)
1022 {
1023 }
1024
1025 void asycDisConnectWorker::start2work()
1026 {
1027     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start disconnecting"));
1028     timespec t;
1029     t.tv_nsec = 0;
1030     t.tv_sec = 1;
1031
1032     //do something for one second
1033     if (timedWait(t))
1034         return;
1035     am_RoutingElement_s route;
1036
1037     //enter new connectionID into the list
1038     mAsyncSender->insertConnectionSafe(mConnectionID, route);
1039
1040     //send the ack
1041     mShadow->ackDisconnect(mHandle, mConnectionID, E_OK);
1042
1043     //destroy the handle
1044     mAsyncSender->removeHandleSafe(mHandle.handle);
1045
1046 }
1047
1048 void asycDisConnectWorker::cancelWork()
1049 {
1050     mAsyncSender->removeHandleSafe(mHandle.handle);
1051     mShadow->ackDisconnect(mHandle, mConnectionID, E_ABORTED);
1052 }
1053
1054 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1055         Worker(pool), //
1056         mAsyncSender(asyncSender), //
1057         mShadow(shadow), //
1058         mCurrentVolume(currentVolume), //
1059         mHandle(handle), //
1060         mSinkID(sinkID), //
1061         mVolume(volume), //
1062         mRamp(ramp), //
1063         mTime(time)
1064 {
1065 }
1066
1067 void asyncSetSinkVolumeWorker::start2work()
1068 {
1069     //todo: this implementation does not respect time and ramp....
1070     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink volume"));
1071     timespec t;
1072     t.tv_nsec = 10000000;
1073     t.tv_sec = 0;
1074
1075     while (mCurrentVolume != mVolume)
1076     {
1077         if (mCurrentVolume < mVolume)
1078             mCurrentVolume++;
1079         else
1080             mCurrentVolume--;
1081         mShadow->ackSinkVolumeTick(mHandle, mSinkID, mCurrentVolume);
1082         if (timedWait(t))
1083             return;
1084     }
1085
1086     //enter new connectionID into the list
1087     mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1088
1089     //send the ack
1090     mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_OK);
1091
1092     //destroy the handle
1093     mAsyncSender->removeHandleSafe(mHandle.handle);
1094 }
1095
1096 void asyncSetSinkVolumeWorker::cancelWork()
1097 {
1098     mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1099     mAsyncSender->removeHandleSafe(mHandle.handle);
1100     mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1101 }
1102
1103 asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1104         Worker(pool), //
1105         mAsyncSender(asyncSender), //
1106         mShadow(shadow), //
1107         mCurrentVolume(currentVolume), //
1108         mHandle(handle), //
1109         mSourceID(SourceID), //
1110         mVolume(volume), //
1111         mRamp(ramp), //
1112         mTime(time)
1113 {
1114 }
1115
1116 void asyncSetSourceVolumeWorker::start2work()
1117 {
1118     //todo: this implementation does not respect time and ramp....
1119     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source volume"));
1120     timespec t;
1121     t.tv_nsec = 10000000;
1122     t.tv_sec = 0;
1123
1124     while (mCurrentVolume != mVolume)
1125     {
1126         if (mCurrentVolume < mVolume)
1127             mCurrentVolume++;
1128         else
1129             mCurrentVolume--;
1130         mShadow->ackSourceVolumeTick(mHandle, mSourceID, mCurrentVolume);
1131         if (timedWait(t))
1132             return;
1133     }
1134
1135     //enter new connectionID into the list
1136     mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1137
1138     //send the ack
1139     mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_OK);
1140
1141     //destroy the handle
1142     mAsyncSender->removeHandleSafe(mHandle.handle);
1143 }
1144
1145 void asyncSetSourceVolumeWorker::cancelWork()
1146 {
1147     mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1148     mAsyncSender->removeHandleSafe(mHandle.handle);
1149     mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1150 }
1151
1152 asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state) :
1153         Worker(pool), //
1154         mAsyncSender(asyncSender), //
1155         mShadow(shadow), //
1156         mHandle(handle), //
1157         mSourceID(sourceID), //
1158         mSourcestate(state)
1159 {
1160 }
1161
1162 void asyncSetSourceStateWorker::start2work()
1163 {
1164     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source state"));
1165     timespec t;
1166     t.tv_nsec = 0;
1167     t.tv_sec = 1;
1168
1169     //do something for one second
1170     if (timedWait(t))
1171         return;
1172
1173     //enter new connectionID into the list
1174     mAsyncSender->updateSourceStateSafe(mSourceID, mSourcestate);
1175
1176     //send the ack
1177     mShadow->ackSetSourceState(mHandle, E_OK);
1178
1179     //destroy the handle
1180     mAsyncSender->removeHandleSafe(mHandle.handle);
1181 }
1182
1183 void asyncSetSourceStateWorker::cancelWork()
1184 {
1185     //send the ack
1186     mShadow->ackSetSourceState(mHandle, E_ABORTED);
1187
1188     //destroy the handle
1189     mAsyncSender->removeHandleSafe(mHandle.handle);
1190 }
1191
1192 asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID) :
1193         Worker(pool), //
1194         mAsyncSender(asyncSender), //
1195         mShadow(shadow), //
1196         mHandle(handle), //
1197         mSinkID(sinkID), //
1198         mSoundProperty(soundProperty)
1199 {
1200 }
1201
1202 void asyncSetSinkSoundPropertyWorker::start2work()
1203 {
1204     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink sound property"));
1205     timespec t;
1206     t.tv_nsec = 0;
1207     t.tv_sec = 1;
1208
1209     //do something for one second
1210     if (timedWait(t))
1211         return;
1212
1213     //enter new connectionID into the list
1214     mAsyncSender->updateSinkSoundPropertySafe(mSinkID, mSoundProperty);
1215
1216     //send the ack
1217     mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1218
1219     //destroy the handle
1220     mAsyncSender->removeHandleSafe(mHandle.handle);
1221 }
1222
1223 void asyncSetSinkSoundPropertyWorker::cancelWork()
1224 {
1225     //send the ack
1226     mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1227
1228     //destroy the handle
1229     mAsyncSender->removeHandleSafe(mHandle.handle);
1230 }
1231
1232 asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID) :
1233         Worker(pool), //
1234         mAsyncSender(asyncSender), //
1235         mShadow(shadow), //
1236         mHandle(handle), //
1237         mSourceID(sourceID), //
1238         mSoundProperty(soundProperty)
1239 {
1240 }
1241
1242 void asyncSetSourceSoundPropertyWorker::start2work()
1243 {
1244     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1245     timespec t;
1246     t.tv_nsec = 0;
1247     t.tv_sec = 1;
1248
1249     //do something for one second
1250     if (timedWait(t))
1251         return;
1252
1253     //enter new connectionID into the list
1254     mAsyncSender->updateSourceSoundPropertySafe(mSourceID, mSoundProperty);
1255
1256     //send the ack
1257     mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1258
1259     //destroy the handle
1260     mAsyncSender->removeHandleSafe(mHandle.handle);
1261 }
1262
1263 void asyncSetSourceSoundPropertyWorker::cancelWork()
1264 {
1265     //send the ack
1266     mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1267
1268     //destroy the handle
1269     mAsyncSender->removeHandleSafe(mHandle.handle);
1270 }
1271
1272 asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState) :
1273         Worker(pool), //
1274         mAsyncSender(asyncSender), //
1275         mShadow(shadow), //
1276         mDomainID(domainID), //
1277         mDomainState(domainState)
1278 {
1279 }
1280
1281 void asyncDomainStateChangeWorker::start2work()
1282 {
1283     //todo: sendchanged data must be in here !
1284     DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1285     timespec t;
1286     t.tv_nsec = 0;
1287     t.tv_sec = 1;
1288
1289     //do something for one second
1290     if (timedWait(t))
1291         return;
1292
1293     //enter new connectionID into the list
1294     mAsyncSender->updateDomainstateSafe(mDomainID, mDomainState);
1295     mShadow->hookDomainStateChange(mDomainID, mDomainState);
1296     //send the new status
1297
1298 }
1299
1300 void am::asyncDomainStateChangeWorker::cancelWork()
1301 {
1302     //send the new status
1303     mShadow->hookDomainStateChange(mDomainID, mDomainState);
1304 }
1305