* assert on empty busname (routinginterface)
[profile/ivi/audiomanager.git] / PluginRoutingInterfaceAsync / src / RoutingSenderAsync.cpp
1 /**
2 * Copyright (C) 2011, BMW AG
3 *
4 * GeniviAudioMananger DbusPlugin
5 *
6 * \file RoutingSender.cpp
7 *
8 * \date 20-Oct-2011 3:42:04 PM
9 * \author Christian Mueller (christian.ei.mueller@bmw.de)
10 *
11 * \section License
12 * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13 * Copyright (C) 2011, BMW AG Christian Mueller  Christian.ei.mueller@bmw.de
14 *
15 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17 * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18 * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19 * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20 * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21 * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
22 *
23 */
24
25 #include "RoutingSenderAsyn.h"
26 #include "DltContext.h"
27 #include <algorithm>
28 #include <vector>
29 #include <poll.h>
30 #include <errno.h>
31 #include <time.h>
32 #include <assert.h>
33 #include <sstream>
34 #include <string>
35 #include <dbus/dbus.h>
36
37 using namespace am;
38
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
40
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory() {
42     return (new AsyncRoutingSender());
43 }
44
45 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface) {
46     delete routingSendInterface;
47 }
48
49 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
50 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
51 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
55
56
57 void* AsyncRoutingSender::InterruptEvents(void *data)
58 {
59         //This is a very very very basic implementation of the dbus interface
60         //there is not failure handling, nothing.
61         //it is used just for testing, not intended to be used otherwise...
62         RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
63         DBusError err;
64         DBusMessage* msg;
65         DBusConnection* conn;
66         dbus_error_init(&err);
67         conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
68         dbus_uint32_t serial = 0;
69         DBusMessage* reply;
70         DBusMessageIter args;
71         int answer = dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
72
73         while (dbus_connection_read_write_dispatch(conn, -1))
74         {
75                 dbus_connection_read_write(conn, 0);
76                 msg = dbus_connection_pop_message(conn);
77
78                 if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
79                 {
80                         am_connectionID_t connectionID;
81                         am_timeSync_t delay;
82                         dbus_message_iter_init(msg, &args);
83                         dbus_message_iter_get_basic(&args,(void*) &connectionID);
84                         dbus_message_iter_next(&args);
85                         dbus_message_iter_get_basic(&args,(void*) &delay);
86                         reply = dbus_message_new_method_return(msg);
87                         dbus_message_iter_init_append(reply, &args);
88                         dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
89                         dbus_connection_send(conn, reply, &serial);
90                         shadow->hookTimingInformationChanged(connectionID,delay);
91                         dbus_message_unref(reply);
92                 }
93                 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
94                 {
95                         am_sinkID_t sinkID;
96                         am_timeSync_t delay;
97                         am_Availability_s availability;
98                         dbus_message_iter_init(msg, &args);
99                         dbus_message_iter_get_basic(&args,(void*) &sinkID);
100                         reply = dbus_message_new_method_return(msg);
101                         dbus_message_iter_init_append(reply, &args);
102                         dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
103                         dbus_connection_send(conn, reply, &serial);
104                         shadow->hookSinkAvailablityStatusChange(sinkID,availability);
105                         dbus_message_unref(reply);
106                 }
107                 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
108                 {
109                         am_sourceID_t sourceID;
110                         am_timeSync_t delay;
111                         am_Availability_s availability;
112                         dbus_message_iter_init(msg, &args);
113                         dbus_message_iter_get_basic(&args,(void*) &sourceID);
114                         reply = dbus_message_new_method_return(msg);
115                         dbus_message_iter_init_append(reply, &args);
116                         dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
117                         dbus_connection_send(conn, reply, &serial);
118                         shadow->hookSourceAvailablityStatusChange(sourceID,availability);
119                         dbus_message_unref(reply);
120                 }
121                 else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
122                 {
123                         am_sourceID_t sourceID;
124
125                         am_InterruptState_e state;
126                         dbus_message_iter_init(msg, &args);
127                         dbus_message_iter_get_basic(&args,(void*) &sourceID);
128                         reply = dbus_message_new_method_return(msg);
129                         dbus_message_iter_init_append(reply, &args);
130                         dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
131                         dbus_connection_send(conn, reply, &serial);
132                         shadow->hookInterruptStatusChange(sourceID,state);
133                         dbus_message_unref(reply);
134                 }
135                 dbus_connection_flush(conn);
136         }
137 }
138
139
140
141 void *WorkerThreadPool::WorkerThread(void* data)
142 {
143         threadInfo_s *myInfo=(threadInfo_s*)data;
144         while (1)
145         {
146                 sem_wait(&myInfo->block);
147                 pthread_mutex_lock(&mBlockingMutex);
148                 Worker* actWorker=myInfo->worker;
149                 pthread_mutex_unlock(&mBlockingMutex);
150                 actWorker->setCancelSempaphore(&myInfo->cancel);
151                 actWorker->start2work();
152                 actWorker->pPool->finishedWork(myInfo->threadID);
153         }
154 }
155
156 WorkerThreadPool::WorkerThreadPool(int numThreads)
157         :mNumThreads(numThreads)
158 {
159         int workerID=0;
160         mListWorkers.resize(mNumThreads);
161         for (int i=0;i<mNumThreads;i++)
162         {
163                 sem_init(&mListWorkers[i].block,NULL,NULL);
164                 sem_init(&mListWorkers[i].cancel,NULL,NULL);
165                 mListWorkers[i].busy=false;
166                 mListWorkers[i].workerID=++workerID;
167                 pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
168         }
169 }
170
171 int16_t WorkerThreadPool::startWork(Worker *worker)
172 {
173         pthread_mutex_lock(&mBlockingMutex);
174         std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
175         for(;it!=mListWorkers.end();++it)
176         {
177                 if(!it->busy)
178                 {
179                         it->worker=worker;
180                         it->busy=true;
181                         pthread_mutex_unlock(&mBlockingMutex);
182                         sem_post(&it->block);
183                         return ((int)it->workerID);
184                 }
185         }
186         pthread_mutex_unlock(&mBlockingMutex);
187         return (-1);
188 }
189
190 bool WorkerThreadPool::cancelWork(int workerID)
191 {
192         std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
193         for(;it!=mListWorkers.end();++it)
194         {
195                 if(it->workerID==workerID && it->busy)
196                 {
197                         sem_post(&it->cancel);
198                         return (true);
199                 }
200         }
201         return (false);
202 }
203
204 void WorkerThreadPool::finishedWork(pthread_t threadID)
205 {
206         pthread_mutex_lock(&mBlockingMutex);
207         std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
208         for(;it!=mListWorkers.end();++it)
209         {
210                 if(it->threadID==threadID)
211                 {
212                         it->busy=false;
213                         delete it->worker;
214                         break;
215                 }
216         }
217         pthread_mutex_unlock(&mBlockingMutex);
218 }
219
220
221
222 WorkerThreadPool::~WorkerThreadPool()
223 {
224         for (int i=0;i<mNumThreads;i++)
225         {
226                 pthread_cancel(mListWorkers[i].threadID);
227         }
228 }
229
230 Worker::Worker(WorkerThreadPool *pool)
231 :pPool(pool),
232  mCancelSem()
233 {
234 }
235
236 void Worker::setCancelSempaphore(sem_t* cancel)
237 {
238         mCancelSem=cancel;
239 }
240
241
242
243 bool Worker::timedWait(timespec timer)
244 {
245         timespec temp;
246         if(clock_gettime(0, &temp)==-1)
247         {
248                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait error on getting time"));
249         }
250         temp.tv_nsec+=timer.tv_nsec;
251         temp.tv_sec+=timer.tv_sec;
252         //if(sem_wait(mCancelSem)==-1)
253         if (sem_timedwait(mCancelSem,&temp)==-1)
254         {
255                 //a timeout happened
256                 if(errno == ETIMEDOUT)
257                 {
258                         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait timeout waiting error"));
259                         return (false);
260                 }
261                 else //failure in waiting, nevertheless, we quit the thread...
262                 {
263                         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
264                         return (true);
265                 }
266         }
267         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
268         this->cancelWork();
269         return (true);
270 }
271
272 AsyncRoutingSender::AsyncRoutingSender()
273         :mShadow(),
274          mReceiveInterface(0),
275          mDomains(createDomainTable()),
276          mSinks(createSinkTable()),
277          mSources(createSourceTable()),
278          mGateways(createGatewayTable()),
279          mMapHandleWorker(),
280          mMapConnectionIDRoute(),
281          mPool(10)
282 {
283         DLT_REGISTER_CONTEXT(PluginRoutingAsync,"ASY","Async Plugin");
284         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("AsyncRoutingSender constructed"));
285 }
286
287
288
289 AsyncRoutingSender::~AsyncRoutingSender()
290 {
291 }
292
293
294
295 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
296 {
297         //first, create the Shadow:
298         assert(routingreceiveinterface!=0);
299         mReceiveInterface=routingreceiveinterface;
300         mShadow.setRoutingInterface(routingreceiveinterface);
301 }
302
303
304
305 void AsyncRoutingSender::routingInterfacesReady()
306 {
307         assert(mReceiveInterface!=0);
308         am_Error_e eCode;
309         //first register the domains
310         std::vector<am_Domain_s>::iterator domainIter=mDomains.begin();
311         for(;domainIter!=mDomains.end();++domainIter)
312         {
313                 am_domainID_t domainID;
314                 if((eCode=mReceiveInterface->registerDomain(*domainIter,domainID))!=E_OK)
315                 {
316                         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with"), DLT_INT(eCode));
317                 }
318                 domainIter->domainID=domainID;
319         }
320
321         //then sources
322         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
323         for(;sourceIter!=mSources.end();++sourceIter)
324         {
325                 am_sourceID_t sourceID;
326                 //set the correct domainID
327                 sourceIter->domainID=mDomains[0].domainID;
328                 if((eCode=mReceiveInterface->registerSource(*sourceIter,sourceID))!=E_OK)
329                 {
330                         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with"), DLT_INT(eCode));
331                 }
332         }
333
334         //sinks
335         std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
336         for(;sinkIter!=mSinks.end();++sinkIter)
337         {
338                 am_sinkID_t sinkID;
339                 //set the correct domainID
340                 sinkIter->domainID=mDomains[0].domainID;
341                 if((eCode=mReceiveInterface->registerSink(*sinkIter,sinkID))!=E_OK)
342                 {
343                         DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with"), DLT_INT(eCode));
344                 }
345         }
346
347         //gateways
348 //      std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
349 //      for(;gatewayIter!=mGateways.end();++gatewayIter)
350 //      {
351 //              am_gatewayID_t gatewayID;
352 //              gatewayIter->domainSinkID=mDomains[0].domainID;
353 //              gatewayIter->domainSourceID=mDomains[1].domainID;
354 //              gatewayIter->controlDomainID=mDomains[0].domainID;
355 //              if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
356 //              {
357 //                      DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with"), DLT_INT(eCode));
358 //              }
359 //              gatewayIter->gatewayID=gatewayID;
360 //      }
361
362         //create thread for interrupts:
363         pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
364 }
365
366
367
368 void AsyncRoutingSender::routingInterfacesRundown()
369 {
370         assert(mReceiveInterface!=0);
371 }
372
373
374
375 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
376 {
377         assert(mReceiveInterface!=0);
378         assert(handle.handle!=0);
379
380         //first check if we know the handle
381         pthread_mutex_lock(&mMapHandleWorkerMutex);
382         std::map<uint16_t,int16_t>::iterator iter=mMapHandleWorker.begin();
383         if(mMapHandleWorker.find(handle.handle)==mMapHandleWorker.end())
384         {
385                 pthread_mutex_unlock(&mMapHandleWorkerMutex);
386                 return (E_NON_EXISTENT);
387         }
388         pthread_mutex_unlock(&mMapHandleWorkerMutex);
389
390
391         //ok, cancel the action:
392         if(mPool.cancelWork(iter->second)) return (E_OK);
393         return (E_UNKNOWN);
394 }
395
396
397
398 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
399 {
400         assert(mReceiveInterface!=0);
401         assert(handle.handle!=0);
402         assert(handle.handleType==H_CONNECT);
403         assert(connectionID!=0);
404         assert(sinkID!=0);
405         assert(sourceID!=0);
406
407         //check if we can take the job
408         am_Sink_s sink;
409         am_Source_s source;
410         int16_t work=-1;
411
412         //find the sink
413         std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
414         for(;sinkIter!=mSinks.end();++sinkIter)
415         {
416                 if(sinkIter->sinkID==sinkID)
417                 {
418                         sink=*sinkIter;
419                         break;
420                 }
421         }
422         if (sinkIter==mSinks.end()) return (E_NON_EXISTENT); //not found!
423
424         //find the source
425         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
426         for(;sourceIter!=mSources.end();++sourceIter)
427         {
428                 if(sourceIter->sourceID==sourceID)
429                 {
430                         source=*sourceIter;
431                         break;
432                 }
433         }
434         if(sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
435
436         //check the format
437         if(std::find(source.listConnectionFormats.begin(),source.listConnectionFormats.end(),connectionFormat)==source.listConnectionFormats.end()) return (E_WRONG_FORMAT);
438         if(std::find(sink.listConnectionFormats.begin(),sink.listConnectionFormats.end(),connectionFormat)==sink.listConnectionFormats.end()) return (E_WRONG_FORMAT);
439
440         //the operation is ok, lets create a worker, assign it to a task in the task pool
441         asycConnectWorker *worker=new asycConnectWorker(this,&mPool,&mShadow,handle,connectionID,sourceID,sinkID,connectionFormat);
442         if((work=mPool.startWork(worker))==-1)
443         {
444                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncConnect not enough threads!"));
445                 delete worker;
446                 return (E_NOT_POSSIBLE);
447         }
448
449         //save the handle related to the workerID
450         pthread_mutex_lock(&mMapHandleWorkerMutex);
451         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
452         pthread_mutex_unlock(&mMapHandleWorkerMutex);
453
454         return (E_OK);
455 }
456
457
458
459 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
460 {
461         assert(mReceiveInterface!=0);
462         assert(handle.handle!=0);
463         assert(handle.handleType==H_DISCONNECT);
464         assert(connectionID!=0);
465
466         //check if we can take the job
467         int16_t work=-1;
468
469         pthread_mutex_lock(&mMapConnectionMutex);
470         if (mMapConnectionIDRoute.find(connectionID)==mMapConnectionIDRoute.end())
471         {
472                 pthread_mutex_unlock(&mMapConnectionMutex);
473                 return (E_NON_EXISTENT);
474         }
475         pthread_mutex_unlock(&mMapConnectionMutex);
476
477         //the operation is ok, lets create a worker, assign it to a task in the task pool
478         asycDisConnectWorker *worker=new asycDisConnectWorker(this,&mPool,&mShadow,handle,connectionID);
479         if((work=mPool.startWork(worker))==-1)
480         {
481                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncDisconnect not enough threads!"));
482                 delete worker;
483                 return (E_NOT_POSSIBLE);
484         }
485
486         //save the handle related to the workerID
487         pthread_mutex_lock(&mMapHandleWorkerMutex);
488         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
489         pthread_mutex_unlock(&mMapHandleWorkerMutex);
490
491         return (E_OK);
492 }
493
494
495
496 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
497 {
498         assert(mReceiveInterface!=0);
499         assert(handle.handle!=0);
500         assert(handle.handleType==H_SETSINKVOLUME);
501         assert(sinkID!=0);
502
503         //check if we can take the job
504         am_Sink_s sink;
505         int16_t work=-1;
506
507         //find the sink
508         pthread_mutex_lock(&mSinksMutex);
509         std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
510         for(;sinkIter!=mSinks.end();++sinkIter)
511         {
512                 if(sinkIter->sinkID==sinkID)
513                 {
514                         sink=*sinkIter;
515                         break;
516                 }
517         }
518         pthread_mutex_unlock(&mSinksMutex);
519         if (sinkIter==mSinks.end()) return (E_NON_EXISTENT); //not found!
520
521         asyncSetSinkVolumeWorker *worker=new asyncSetSinkVolumeWorker(this,&mPool,&mShadow,sinkIter->volume,handle,sinkID,volume,ramp,time);
522         if((work=mPool.startWork(worker))==-1)
523         {
524                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"));
525                 delete worker;
526                 return (E_NOT_POSSIBLE);
527         }
528
529         //save the handle related to the workerID
530         pthread_mutex_lock(&mMapHandleWorkerMutex);
531         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
532         pthread_mutex_unlock(&mMapHandleWorkerMutex);
533
534         return (E_OK);
535 }
536
537
538
539 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
540 {
541         assert(mReceiveInterface!=0);
542         assert(handle.handle!=0);
543         assert(handle.handleType==H_SETSOURCEVOLUME);
544         assert(sourceID!=0);
545
546         //check if we can take the job
547         am_Source_s source;
548         int16_t work=-1;
549
550         //find the sink
551         pthread_mutex_lock(&mSourcesMutex);
552         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
553         for(;sourceIter!=mSources.end();++sourceIter)
554         {
555                 if(sourceIter->sourceID==sourceID)
556                 {
557                         source=*sourceIter;
558                         break;
559                 }
560         }
561         pthread_mutex_unlock(&mSourcesMutex);
562         if (sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
563
564         asyncSetSourceVolumeWorker *worker=new asyncSetSourceVolumeWorker(this,&mPool,&mShadow,sourceIter->volume,handle,sourceID,volume,ramp,time);
565         if((work=mPool.startWork(worker))==-1)
566         {
567                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"));
568                 delete worker;
569                 return (E_NOT_POSSIBLE);
570         }
571
572         //save the handle related to the workerID
573         pthread_mutex_lock(&mMapHandleWorkerMutex);
574         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
575         pthread_mutex_unlock(&mMapHandleWorkerMutex);
576
577         return (E_OK);
578 }
579
580
581
582 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
583 {
584         assert(mReceiveInterface!=0);
585         assert(handle.handle!=0);
586         assert(handle.handleType==H_SETSOURCESTATE);
587         assert(sourceID!=0);
588
589         //check if we can take the job
590         am_Source_s source;
591         int16_t work=-1;
592
593         //find the source
594         pthread_mutex_lock(&mSourcesMutex);
595         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
596         for(;sourceIter!=mSources.end();++sourceIter)
597         {
598                 if(sourceIter->sourceID==sourceID)
599                 {
600                         source=*sourceIter;
601                         break;
602                 }
603         }
604         pthread_mutex_unlock(&mSourcesMutex);
605         if (sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
606
607         asyncSetSourceStateWorker *worker=new asyncSetSourceStateWorker(this,&mPool,&mShadow,handle,sourceID,state);
608         if((work=mPool.startWork(worker))==-1)
609         {
610                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
611                 delete worker;
612                 return (E_NOT_POSSIBLE);
613         }
614
615         //save the handle related to the workerID
616         pthread_mutex_lock(&mMapHandleWorkerMutex);
617         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
618         pthread_mutex_unlock(&mMapHandleWorkerMutex);
619
620         return (E_OK);
621 }
622
623
624 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_SoundProperty_s& soundProperty, const am_sinkID_t sinkID)
625 {
626         assert(mReceiveInterface!=0);
627         assert(handle.handle!=0);
628         assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
629         assert(sinkID!=0);
630
631         //check if we can take the job
632         am_Sink_s sink;
633         int16_t work=-1;
634
635         //find the sink
636         pthread_mutex_lock(&mSinksMutex);
637         std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
638         for(;sinkIter!=mSinks.end();++sinkIter)
639         {
640                 if(sinkIter->sinkID==sinkID)
641                 {
642                         sink=*sinkIter;
643                         break;
644                 }
645         }
646         pthread_mutex_unlock(&mSinksMutex);
647         if (sinkIter==mSinks.end()) return (E_NON_EXISTENT); //not found!
648
649         asyncSetSinkSoundPropertyWorker *worker=new asyncSetSinkSoundPropertyWorker(this,&mPool,&mShadow,handle,soundProperty,sinkID);
650         if((work=mPool.startWork(worker))==-1)
651         {
652                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"));
653                 delete worker;
654                 return (E_NOT_POSSIBLE);
655         }
656
657         //save the handle related to the workerID
658         pthread_mutex_lock(&mMapHandleWorkerMutex);
659         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
660         pthread_mutex_unlock(&mMapHandleWorkerMutex);
661
662         return (E_OK);
663 }
664
665
666 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
667 {
668         //todo: implement crossfader
669         return E_NOT_USED;
670 }
671
672
673
674 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
675 {
676         assert(mReceiveInterface!=0);
677         assert(domainID!=0);
678
679         //check if we can take the job
680         am_Domain_s domain;
681         int16_t work=-1;
682
683         //find the sink
684         pthread_mutex_lock(&mDomainsMutex);
685         std::vector<am_Domain_s>::iterator domainIter=mDomains.begin();
686         for(;domainIter!=mDomains.end();++domainIter)
687         {
688                 if(domainIter->domainID==domainID)
689                 {
690                         domain=*domainIter;
691                         break;
692                 }
693         }
694         pthread_mutex_unlock(&mDomainsMutex);
695         if (domainIter==mDomains.end()) return (E_NON_EXISTENT); //not found!
696
697         asyncDomainStateChangeWorker *worker=new asyncDomainStateChangeWorker(this,&mPool,&mShadow,domainID,domainState);
698         if((work=mPool.startWork(worker))==-1)
699         {
700                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::setDomainState not enough threads!"));
701                 delete worker;
702                 return (E_NOT_POSSIBLE);
703         }
704
705         return (E_OK);
706
707 }
708
709
710
711 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_SoundProperty_s & soundProperty, const am_sourceID_t sourceID)
712 {
713         assert(mReceiveInterface!=0);
714         assert(handle.handle!=0);
715         assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
716         assert(sourceID!=0);
717
718         //check if we can take the job
719         am_Source_s source;
720         int16_t work=-1;
721
722         //find the source
723         pthread_mutex_lock(&mSourcesMutex);
724         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
725         for(;sourceIter!=mSources.end();++sourceIter)
726         {
727                 if(sourceIter->sourceID==sourceID)
728                 {
729                         source=*sourceIter;
730                         break;
731                 }
732         }
733         pthread_mutex_unlock(&mSourcesMutex);
734         if (sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
735
736         asyncSetSourceSoundPropertyWorker *worker=new asyncSetSourceSoundPropertyWorker(this,&mPool,&mShadow,handle,soundProperty,sourceID);
737         if((work=mPool.startWork(worker))==-1)
738         {
739                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
740                 delete worker;
741                 return (E_NOT_POSSIBLE);
742         }
743
744         //save the handle related to the workerID
745         pthread_mutex_lock(&mMapHandleWorkerMutex);
746         mMapHandleWorker.insert(std::make_pair(handle.handle,work));
747         pthread_mutex_unlock(&mMapHandleWorkerMutex);
748
749         return (E_OK);
750 }
751
752
753
754 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
755 {
756         BusName="RoutingAsync";
757         return (E_OK);
758 }
759
760 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
761 {
762         //just write two domains into the table and return it
763         std::vector<am_Domain_s> table;
764         am_Domain_s item;
765         item.busname="RoutingAsync";
766         item.domainID=0;
767         item.early=false;
768         item.name="AsyncDomain1";
769         item.nodename="AsyncNode1";
770         item.state=DS_CONTROLLED;
771         table.push_back(item);
772         item.busname="RoutingAsync";
773         item.domainID=0;
774         item.early=false;
775         item.name="AsyncDomain2";
776         item.nodename="AsyncNode2";
777         item.state=DS_CONTROLLED;
778         table.push_back(item);
779         return (table);
780 }
781
782
783
784 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
785 {
786         //create a bunch full of sinks
787         std::vector<am_Sink_s> table;
788         am_Sink_s item;
789         am_SoundProperty_s sp;
790         sp.type=SP_BASS;
791         sp.value=0;
792         for (int16_t i=0;i<=10;i++)
793         {
794                 std::stringstream temp;
795                 temp<<i;
796                 item.domainID=0;     //we cannot know this when the table is created !
797                 item.name="mySink" + temp.str();
798                 item.sinkID=i;           //take fixed ids to make thins easy
799                 item.sinkClassID=1;
800                 item.volume=0;
801                 item.listSoundProperties.push_back(sp);
802                 item.visible=true;
803                 item.listConnectionFormats.push_back(CF_ANALOG);
804                 table.push_back(item);
805         }
806         return (table);
807 }
808
809
810
811 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
812 {
813         //create a bunch full of sources
814         std::vector<am_Source_s> table;
815         am_Source_s item;
816         for (int16_t i=0;i<=10;i++)
817         {
818                 std::stringstream temp;
819                 temp<<i;
820                 item.domainID=0;     //we cannot know this when the table is created !
821                 item.name="mySource" + temp.str();
822                 item.sourceID=i;                 //take fixed ids to make thins easy
823                 item.sourceClassID=1;
824                 item.volume=0;
825                 item.visible=true;
826                 item.listConnectionFormats.push_back(CF_ANALOG);
827                 table.push_back(item);
828         }
829         return (table);
830 }
831
832 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
833 {
834         pthread_mutex_lock(&mMapConnectionMutex);
835         mMapConnectionIDRoute.insert(std::make_pair(connectionID,route));
836         pthread_mutex_unlock(&mMapConnectionMutex);
837 }
838
839 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
840 {
841         pthread_mutex_lock(&mMapHandleWorkerMutex);
842         if (mMapHandleWorker.erase(handle))
843         {
844                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeHandle could not remove handle"));
845         }
846         pthread_mutex_unlock(&mMapHandleWorkerMutex);
847 }
848
849 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
850 {
851         pthread_mutex_lock(&mMapConnectionMutex);
852         if (mMapConnectionIDRoute.erase(connectionID))
853         {
854                 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeConnectionSafe could not remove connection"));
855         }
856         pthread_mutex_unlock(&mMapConnectionMutex);
857 }
858
859 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
860 {
861         pthread_mutex_lock(&mSinksMutex);
862         std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
863         for(;sinkIter!=mSinks.end();++sinkIter)
864         {
865                 if(sinkIter->sinkID==sinkID)
866                 {
867                         sinkIter->volume=volume;
868                         break;
869                 }
870         }
871         pthread_mutex_unlock(&mSinksMutex);
872 }
873
874 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
875 {
876         pthread_mutex_lock(&mSourcesMutex);
877         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
878         for(;sourceIter!=mSources.end();++sourceIter)
879         {
880                 if(sourceIter->sourceID==sourceID)
881                 {
882                         sourceIter->volume=volume;
883                         break;
884                 }
885         }
886         pthread_mutex_unlock(&mSourcesMutex);
887 }
888
889 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
890 {
891         pthread_mutex_lock(&mSourcesMutex);
892         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
893         for(;sourceIter!=mSources.end();++sourceIter)
894         {
895                 if(sourceIter->sourceID==sourceID)
896                 {
897                         sourceIter->sourceState=state;
898                         break;
899                 }
900         }
901         pthread_mutex_unlock(&mSourcesMutex);
902 }
903
904 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
905 {
906         pthread_mutex_lock(&mSinksMutex);
907         std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
908         for(;sinkIter!=mSinks.end();++sinkIter)
909         {
910                 if(sinkIter->sinkID==sinkID)
911                 {
912                         std::vector<am_SoundProperty_s>::iterator spIterator=sinkIter->listSoundProperties.begin();
913                         for(;spIterator!=sinkIter->listSoundProperties.end();++spIterator)
914                         {
915                                 if(spIterator->type==soundProperty.type)
916                                 {
917                                         spIterator->value=soundProperty.value;
918                                         break;
919                                 }
920                         }
921                 }
922         }
923         pthread_mutex_unlock(&mSinksMutex);
924 }
925
926 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
927 {
928         pthread_mutex_lock(&mSourcesMutex);
929         std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
930         for(;sourceIter!=mSources.end();++sourceIter)
931         {
932                 if(sourceIter->sourceID==sourceID)
933                 {
934                         std::vector<am_SoundProperty_s>::iterator spIterator=sourceIter->listSoundProperties.begin();
935                         for(;spIterator!=sourceIter->listSoundProperties.end();++spIterator)
936                         {
937                                 if(spIterator->type==soundProperty.type)
938                                 {
939                                         spIterator->value=soundProperty.value;
940                                         break;
941                                 }
942                         }
943                 }
944         }
945         pthread_mutex_unlock(&mSourcesMutex);
946 }
947
948 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
949 {
950         pthread_mutex_lock(&mDomainsMutex);
951         std::vector<am_Domain_s>::iterator domainIter=mDomains.begin();
952         for(;domainIter!=mDomains.end();++domainIter)
953         {
954                 if(domainIter->domainID==domainID)
955                 {
956                         domainIter->state=domainState;
957                         break;
958                 }
959         }
960         pthread_mutex_unlock(&mDomainsMutex);
961 }
962
963 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
964 {
965         std::vector<am_Gateway_s> table;
966         am_Gateway_s item;
967         item.name="myGateway";
968         item.sinkID=2;
969         item.sourceID=2;
970         table.push_back(item);
971         return (table);
972 }
973
974
975 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender,WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
976         :Worker(pool),
977          mAsyncSender(asyncSender),
978          mShadow(shadow),
979          mHandle(handle),
980          mConnectionID(connectionID),
981          mSourceID(sourceID),
982          mSinkID(sinkID),
983          mConnectionFormat(connectionFormat)
984 {
985 }
986
987 void asycConnectWorker::start2work()
988 {
989         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start connecting"));
990         timespec t;
991         t.tv_nsec=0;
992         t.tv_sec=1;
993
994         //do something for one second
995         if (timedWait(t)) return;
996         am_RoutingElement_s route;
997         route.sinkID=mSinkID;
998         route.sourceID=mSourceID;
999         route.connectionFormat=mConnectionFormat;
1000
1001         //enter new connectionID into the list
1002         mAsyncSender->insertConnectionSafe(mConnectionID,route);
1003
1004         //send the ack
1005         mShadow->ackConnect(mHandle,mConnectionID,E_OK);
1006
1007         //destroy the handle
1008         mAsyncSender->removeHandleSafe(mHandle.handle);
1009 }
1010
1011 void asycConnectWorker::cancelWork()
1012 {
1013         mAsyncSender->removeHandleSafe(mHandle.handle);
1014         mShadow->ackConnect(mHandle,mConnectionID,E_ABORTED);
1015 }
1016
1017 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID)
1018         :Worker(pool),
1019          mAsyncSender(asyncSender),
1020          mShadow(shadow),
1021          mHandle(handle),
1022          mConnectionID(connectionID)
1023 {
1024 }
1025
1026
1027 void asycDisConnectWorker::start2work()
1028 {
1029         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start disconnecting"));
1030         timespec t;
1031         t.tv_nsec=0;
1032         t.tv_sec=1;
1033
1034         //do something for one second
1035         if (timedWait(t)) return;
1036         am_RoutingElement_s route;
1037
1038         //enter new connectionID into the list
1039         mAsyncSender->insertConnectionSafe(mConnectionID,route);
1040
1041         //send the ack
1042         mShadow->ackDisconnect(mHandle,mConnectionID,E_OK);
1043
1044         //destroy the handle
1045         mAsyncSender->removeHandleSafe(mHandle.handle);
1046
1047 }
1048
1049
1050
1051 void asycDisConnectWorker::cancelWork()
1052 {
1053         mAsyncSender->removeHandleSafe(mHandle.handle);
1054         mShadow->ackDisconnect(mHandle,mConnectionID,E_ABORTED);
1055 }
1056
1057 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow,  const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
1058         :Worker(pool),
1059          mAsyncSender(asyncSender),
1060          mShadow(shadow),
1061          mCurrentVolume(currentVolume),
1062          mHandle(handle),
1063          mSinkID(sinkID),
1064          mVolume(volume),
1065          mRamp(ramp),
1066          mTime(time)
1067 {
1068 }
1069
1070
1071
1072 void asyncSetSinkVolumeWorker::start2work()
1073 {
1074         //todo: this implementation does not respect time and ramp....
1075         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting sink volume"));
1076         timespec t;
1077         t.tv_nsec=10000000;
1078         t.tv_sec=0;
1079
1080         while (mCurrentVolume!=mVolume)
1081         {
1082                 if (mCurrentVolume<mVolume) mCurrentVolume++;
1083                 else mCurrentVolume--;
1084                 mShadow->ackSinkVolumeTick(mHandle,mSinkID,mCurrentVolume);
1085                 if (timedWait(t)) return;
1086         }
1087
1088         //enter new connectionID into the list
1089         mAsyncSender->updateSinkVolumeSafe(mSinkID,mCurrentVolume);
1090
1091         //send the ack
1092         mShadow->ackSetSinkVolumeChange(mHandle,mCurrentVolume,E_OK);
1093
1094         //destroy the handle
1095         mAsyncSender->removeHandleSafe(mHandle.handle);
1096 }
1097
1098 void asyncSetSinkVolumeWorker::cancelWork()
1099 {
1100         mAsyncSender->updateSinkVolumeSafe(mSinkID,mCurrentVolume);
1101         mAsyncSender->removeHandleSafe(mHandle.handle);
1102         mShadow->ackSetSinkVolumeChange(mHandle,mCurrentVolume,E_ABORTED);
1103 }
1104
1105 am::asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
1106         :Worker(pool),
1107          mAsyncSender(asyncSender),
1108          mShadow(shadow),
1109          mCurrentVolume(currentVolume),
1110          mHandle(handle),
1111          mSourceID(SourceID),
1112          mVolume(volume),
1113          mRamp(ramp),
1114          mTime(time)
1115 {
1116 }
1117
1118 void am::asyncSetSourceVolumeWorker::start2work()
1119 {
1120         //todo: this implementation does not respect time and ramp....
1121         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source volume"));
1122         timespec t;
1123         t.tv_nsec=10000000;
1124         t.tv_sec=0;
1125
1126         while (mCurrentVolume!=mVolume)
1127         {
1128                 if (mCurrentVolume<mVolume) mCurrentVolume++;
1129                 else mCurrentVolume--;
1130                 mShadow->ackSourceVolumeTick(mHandle,mSourceID,mCurrentVolume);
1131                 if (timedWait(t)) return;
1132         }
1133
1134         //enter new connectionID into the list
1135         mAsyncSender->updateSourceVolumeSafe(mSourceID,mCurrentVolume);
1136
1137         //send the ack
1138         mShadow->ackSetSourceVolumeChange(mHandle,mCurrentVolume,E_OK);
1139
1140         //destroy the handle
1141         mAsyncSender->removeHandleSafe(mHandle.handle);
1142 }
1143
1144
1145
1146 void am::asyncSetSourceVolumeWorker::cancelWork()
1147 {
1148         mAsyncSender->updateSourceVolumeSafe(mSourceID,mCurrentVolume);
1149         mAsyncSender->removeHandleSafe(mHandle.handle);
1150         mShadow->ackSetSourceVolumeChange(mHandle,mCurrentVolume,E_ABORTED);
1151 }
1152
1153 am::asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
1154         :Worker(pool),
1155          mAsyncSender(asyncSender),
1156          mShadow(shadow),
1157          mHandle(handle),
1158          mSourceID(sourceID),
1159          mSourcestate(state)
1160 {
1161 }
1162
1163
1164
1165 void am::asyncSetSourceStateWorker::start2work()
1166 {
1167         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source state"));
1168         timespec t;
1169         t.tv_nsec=0;
1170         t.tv_sec=1;
1171
1172         //do something for one second
1173         if (timedWait(t)) return;
1174
1175         //enter new connectionID into the list
1176         mAsyncSender->updateSourceStateSafe(mSourceID,mSourcestate);
1177
1178         //send the ack
1179         mShadow->ackSetSourceState(mHandle,E_OK);
1180
1181         //destroy the handle
1182         mAsyncSender->removeHandleSafe(mHandle.handle);
1183 }
1184
1185
1186
1187 void am::asyncSetSourceStateWorker::cancelWork()
1188 {
1189         //send the ack
1190         mShadow->ackSetSourceState(mHandle,E_ABORTED);
1191
1192         //destroy the handle
1193         mAsyncSender->removeHandleSafe(mHandle.handle);
1194 }
1195
1196 am::asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle,const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID)
1197         :Worker(pool),
1198          mAsyncSender(asyncSender),
1199          mShadow(shadow),
1200          mHandle(),
1201          mSinkID(sinkID),
1202          mSoundProperty(soundProperty)
1203 {
1204 }
1205
1206
1207
1208 void am::asyncSetSinkSoundPropertyWorker::start2work()
1209 {
1210         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting sink sound property"));
1211         timespec t;
1212         t.tv_nsec=0;
1213         t.tv_sec=1;
1214
1215         //do something for one second
1216         if (timedWait(t)) return;
1217
1218         //enter new connectionID into the list
1219         mAsyncSender->updateSinkSoundPropertySafe(mSinkID,mSoundProperty);
1220
1221         //send the ack
1222         mShadow->ackSetSinkSoundProperty(mHandle,E_OK);
1223
1224         //destroy the handle
1225         mAsyncSender->removeHandleSafe(mHandle.handle);
1226 }
1227
1228
1229
1230 void am::asyncSetSinkSoundPropertyWorker::cancelWork()
1231 {
1232         //send the ack
1233         mShadow->ackSetSinkSoundProperty(mHandle,E_OK);
1234
1235         //destroy the handle
1236         mAsyncSender->removeHandleSafe(mHandle.handle);
1237 }
1238
1239 am::asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID)
1240 :Worker(pool),
1241  mAsyncSender(asyncSender),
1242  mShadow(shadow),
1243  mHandle(),
1244  mSourceID(sourceID),
1245  mSoundProperty(soundProperty)
1246 {
1247 }
1248
1249
1250
1251 void am::asyncSetSourceSoundPropertyWorker::start2work()
1252 {
1253         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1254         timespec t;
1255         t.tv_nsec=0;
1256         t.tv_sec=1;
1257
1258         //do something for one second
1259         if (timedWait(t)) return;
1260
1261         //enter new connectionID into the list
1262         mAsyncSender->updateSourceSoundPropertySafe(mSourceID,mSoundProperty);
1263
1264         //send the ack
1265         mShadow->ackSetSourceSoundProperty(mHandle,E_OK);
1266
1267         //destroy the handle
1268         mAsyncSender->removeHandleSafe(mHandle.handle);
1269 }
1270
1271
1272
1273 void am::asyncSetSourceSoundPropertyWorker::cancelWork()
1274 {
1275         //send the ack
1276         mShadow->ackSetSourceSoundProperty(mHandle,E_OK);
1277
1278         //destroy the handle
1279         mAsyncSender->removeHandleSafe(mHandle.handle);
1280 }
1281
1282 am::asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState)
1283         :Worker(pool),
1284          mAsyncSender(asyncSender),
1285          mShadow(shadow),
1286          mDomainID(domainID),
1287          mDomainState(domainState)
1288 {
1289 }
1290
1291 void am::asyncDomainStateChangeWorker::start2work()
1292 {
1293         //todo: sendchanged data must be in here !
1294         DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1295         timespec t;
1296         t.tv_nsec=0;
1297         t.tv_sec=1;
1298
1299
1300         //do something for one second
1301         if (timedWait(t)) return;
1302
1303         //enter new connectionID into the list
1304         mAsyncSender->updateDomainstateSafe(mDomainID,mDomainState);
1305         mShadow->hookDomainStateChange(mDomainID,mDomainState);
1306         //send the new status
1307
1308 }
1309
1310
1311
1312 void am::asyncDomainStateChangeWorker::cancelWork()
1313 {
1314         //send the new status
1315         mShadow->hookDomainStateChange(mDomainID,mDomainState);
1316 }
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355