2 * Copyright (C) 2011, BMW AG
4 * GeniviAudioMananger DbusPlugin
6 * \file RoutingSender.cpp
8 * \date 20-Oct-2011 3:42:04 PM
9 * \author Christian Mueller (christian.ei.mueller@bmw.de)
12 * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13 * Copyright (C) 2011, BMW AG Christian Mueller Christian.ei.mueller@bmw.de
15 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17 * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18 * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19 * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20 * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21 * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
25 #include "RoutingSenderAsyn.h"
26 #include "DltContext.h"
35 #include <dbus/dbus.h>
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory() {
42 return (new AsyncRoutingSender());
45 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface) {
46 delete routingSendInterface;
49 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
50 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
51 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
57 void* AsyncRoutingSender::InterruptEvents(void *data)
59 //This is a very very very basic implementation of the dbus interface
60 //there is not failure handling, nothing.
61 //it is used just for testing, not intended to be used otherwise...
62 RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
66 dbus_error_init(&err);
67 conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
68 dbus_uint32_t serial = 0;
71 int answer = dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
73 while (dbus_connection_read_write_dispatch(conn, -1))
75 dbus_connection_read_write(conn, 0);
76 msg = dbus_connection_pop_message(conn);
78 if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
80 am_connectionID_t connectionID;
82 dbus_message_iter_init(msg, &args);
83 dbus_message_iter_get_basic(&args,(void*) &connectionID);
84 dbus_message_iter_next(&args);
85 dbus_message_iter_get_basic(&args,(void*) &delay);
86 reply = dbus_message_new_method_return(msg);
87 dbus_message_iter_init_append(reply, &args);
88 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
89 dbus_connection_send(conn, reply, &serial);
90 shadow->hookTimingInformationChanged(connectionID,delay);
91 dbus_message_unref(reply);
93 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
97 am_Availability_s availability;
98 dbus_message_iter_init(msg, &args);
99 dbus_message_iter_get_basic(&args,(void*) &sinkID);
100 reply = dbus_message_new_method_return(msg);
101 dbus_message_iter_init_append(reply, &args);
102 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
103 dbus_connection_send(conn, reply, &serial);
104 shadow->hookSinkAvailablityStatusChange(sinkID,availability);
105 dbus_message_unref(reply);
107 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
109 am_sourceID_t sourceID;
111 am_Availability_s availability;
112 dbus_message_iter_init(msg, &args);
113 dbus_message_iter_get_basic(&args,(void*) &sourceID);
114 reply = dbus_message_new_method_return(msg);
115 dbus_message_iter_init_append(reply, &args);
116 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
117 dbus_connection_send(conn, reply, &serial);
118 shadow->hookSourceAvailablityStatusChange(sourceID,availability);
119 dbus_message_unref(reply);
121 else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
123 am_sourceID_t sourceID;
125 am_InterruptState_e state;
126 dbus_message_iter_init(msg, &args);
127 dbus_message_iter_get_basic(&args,(void*) &sourceID);
128 reply = dbus_message_new_method_return(msg);
129 dbus_message_iter_init_append(reply, &args);
130 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
131 dbus_connection_send(conn, reply, &serial);
132 shadow->hookInterruptStatusChange(sourceID,state);
133 dbus_message_unref(reply);
135 dbus_connection_flush(conn);
141 void *WorkerThreadPool::WorkerThread(void* data)
143 threadInfo_s *myInfo=(threadInfo_s*)data;
146 sem_wait(&myInfo->block);
147 pthread_mutex_lock(&mBlockingMutex);
148 Worker* actWorker=myInfo->worker;
149 pthread_mutex_unlock(&mBlockingMutex);
150 actWorker->setCancelSempaphore(&myInfo->cancel);
151 actWorker->start2work();
152 actWorker->pPool->finishedWork(myInfo->threadID);
156 WorkerThreadPool::WorkerThreadPool(int numThreads)
157 :mNumThreads(numThreads)
160 mListWorkers.resize(mNumThreads);
161 for (int i=0;i<mNumThreads;i++)
163 sem_init(&mListWorkers[i].block,NULL,NULL);
164 sem_init(&mListWorkers[i].cancel,NULL,NULL);
165 mListWorkers[i].busy=false;
166 mListWorkers[i].workerID=++workerID;
167 pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
171 int16_t WorkerThreadPool::startWork(Worker *worker)
173 pthread_mutex_lock(&mBlockingMutex);
174 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
175 for(;it!=mListWorkers.end();++it)
181 pthread_mutex_unlock(&mBlockingMutex);
182 sem_post(&it->block);
183 return ((int)it->workerID);
186 pthread_mutex_unlock(&mBlockingMutex);
190 bool WorkerThreadPool::cancelWork(int workerID)
192 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
193 for(;it!=mListWorkers.end();++it)
195 if(it->workerID==workerID && it->busy)
197 sem_post(&it->cancel);
204 void WorkerThreadPool::finishedWork(pthread_t threadID)
206 pthread_mutex_lock(&mBlockingMutex);
207 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
208 for(;it!=mListWorkers.end();++it)
210 if(it->threadID==threadID)
217 pthread_mutex_unlock(&mBlockingMutex);
222 WorkerThreadPool::~WorkerThreadPool()
224 for (int i=0;i<mNumThreads;i++)
226 pthread_cancel(mListWorkers[i].threadID);
230 Worker::Worker(WorkerThreadPool *pool)
236 void Worker::setCancelSempaphore(sem_t* cancel)
243 bool Worker::timedWait(timespec timer)
246 if(clock_gettime(0, &temp)==-1)
248 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait error on getting time"));
250 temp.tv_nsec+=timer.tv_nsec;
251 temp.tv_sec+=timer.tv_sec;
252 //if(sem_wait(mCancelSem)==-1)
253 if (sem_timedwait(mCancelSem,&temp)==-1)
256 if(errno == ETIMEDOUT)
258 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait timeout waiting error"));
261 else //failure in waiting, nevertheless, we quit the thread...
263 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
267 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
272 AsyncRoutingSender::AsyncRoutingSender()
274 mReceiveInterface(0),
275 mDomains(createDomainTable()),
276 mSinks(createSinkTable()),
277 mSources(createSourceTable()),
278 mGateways(createGatewayTable()),
280 mMapConnectionIDRoute(),
283 DLT_REGISTER_CONTEXT(PluginRoutingAsync,"ASY","Async Plugin");
284 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("AsyncRoutingSender constructed"));
289 AsyncRoutingSender::~AsyncRoutingSender()
295 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
297 //first, create the Shadow:
298 assert(routingreceiveinterface!=0);
299 mReceiveInterface=routingreceiveinterface;
300 mShadow.setRoutingInterface(routingreceiveinterface);
305 void AsyncRoutingSender::routingInterfacesReady()
307 assert(mReceiveInterface!=0);
309 //first register the domains
310 std::vector<am_Domain_s>::iterator domainIter=mDomains.begin();
311 for(;domainIter!=mDomains.end();++domainIter)
313 am_domainID_t domainID;
314 if((eCode=mReceiveInterface->registerDomain(*domainIter,domainID))!=E_OK)
316 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with"), DLT_INT(eCode));
318 domainIter->domainID=domainID;
322 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
323 for(;sourceIter!=mSources.end();++sourceIter)
325 am_sourceID_t sourceID;
326 //set the correct domainID
327 sourceIter->domainID=mDomains[0].domainID;
328 if((eCode=mReceiveInterface->registerSource(*sourceIter,sourceID))!=E_OK)
330 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with"), DLT_INT(eCode));
335 std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
336 for(;sinkIter!=mSinks.end();++sinkIter)
339 //set the correct domainID
340 sinkIter->domainID=mDomains[0].domainID;
341 if((eCode=mReceiveInterface->registerSink(*sinkIter,sinkID))!=E_OK)
343 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with"), DLT_INT(eCode));
348 // std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
349 // for(;gatewayIter!=mGateways.end();++gatewayIter)
351 // am_gatewayID_t gatewayID;
352 // gatewayIter->domainSinkID=mDomains[0].domainID;
353 // gatewayIter->domainSourceID=mDomains[1].domainID;
354 // gatewayIter->controlDomainID=mDomains[0].domainID;
355 // if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
357 // DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with"), DLT_INT(eCode));
359 // gatewayIter->gatewayID=gatewayID;
362 //create thread for interrupts:
363 pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
368 void AsyncRoutingSender::routingInterfacesRundown()
370 assert(mReceiveInterface!=0);
375 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
377 assert(mReceiveInterface!=0);
378 assert(handle.handle!=0);
380 //first check if we know the handle
381 pthread_mutex_lock(&mMapHandleWorkerMutex);
382 std::map<uint16_t,int16_t>::iterator iter=mMapHandleWorker.begin();
383 if(mMapHandleWorker.find(handle.handle)==mMapHandleWorker.end())
385 pthread_mutex_unlock(&mMapHandleWorkerMutex);
386 return (E_NON_EXISTENT);
388 pthread_mutex_unlock(&mMapHandleWorkerMutex);
391 //ok, cancel the action:
392 if(mPool.cancelWork(iter->second)) return (E_OK);
398 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
400 assert(mReceiveInterface!=0);
401 assert(handle.handle!=0);
402 assert(handle.handleType==H_CONNECT);
403 assert(connectionID!=0);
407 //check if we can take the job
413 std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
414 for(;sinkIter!=mSinks.end();++sinkIter)
416 if(sinkIter->sinkID==sinkID)
422 if (sinkIter==mSinks.end()) return (E_NON_EXISTENT); //not found!
425 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
426 for(;sourceIter!=mSources.end();++sourceIter)
428 if(sourceIter->sourceID==sourceID)
434 if(sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
437 if(std::find(source.listConnectionFormats.begin(),source.listConnectionFormats.end(),connectionFormat)==source.listConnectionFormats.end()) return (E_WRONG_FORMAT);
438 if(std::find(sink.listConnectionFormats.begin(),sink.listConnectionFormats.end(),connectionFormat)==sink.listConnectionFormats.end()) return (E_WRONG_FORMAT);
440 //the operation is ok, lets create a worker, assign it to a task in the task pool
441 asycConnectWorker *worker=new asycConnectWorker(this,&mPool,&mShadow,handle,connectionID,sourceID,sinkID,connectionFormat);
442 if((work=mPool.startWork(worker))==-1)
444 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncConnect not enough threads!"));
446 return (E_NOT_POSSIBLE);
449 //save the handle related to the workerID
450 pthread_mutex_lock(&mMapHandleWorkerMutex);
451 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
452 pthread_mutex_unlock(&mMapHandleWorkerMutex);
459 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
461 assert(mReceiveInterface!=0);
462 assert(handle.handle!=0);
463 assert(handle.handleType==H_DISCONNECT);
464 assert(connectionID!=0);
466 //check if we can take the job
469 pthread_mutex_lock(&mMapConnectionMutex);
470 if (mMapConnectionIDRoute.find(connectionID)==mMapConnectionIDRoute.end())
472 pthread_mutex_unlock(&mMapConnectionMutex);
473 return (E_NON_EXISTENT);
475 pthread_mutex_unlock(&mMapConnectionMutex);
477 //the operation is ok, lets create a worker, assign it to a task in the task pool
478 asycDisConnectWorker *worker=new asycDisConnectWorker(this,&mPool,&mShadow,handle,connectionID);
479 if((work=mPool.startWork(worker))==-1)
481 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncDisconnect not enough threads!"));
483 return (E_NOT_POSSIBLE);
486 //save the handle related to the workerID
487 pthread_mutex_lock(&mMapHandleWorkerMutex);
488 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
489 pthread_mutex_unlock(&mMapHandleWorkerMutex);
496 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
498 assert(mReceiveInterface!=0);
499 assert(handle.handle!=0);
500 assert(handle.handleType==H_SETSINKVOLUME);
503 //check if we can take the job
508 pthread_mutex_lock(&mSinksMutex);
509 std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
510 for(;sinkIter!=mSinks.end();++sinkIter)
512 if(sinkIter->sinkID==sinkID)
518 pthread_mutex_unlock(&mSinksMutex);
519 if (sinkIter==mSinks.end()) return (E_NON_EXISTENT); //not found!
521 asyncSetSinkVolumeWorker *worker=new asyncSetSinkVolumeWorker(this,&mPool,&mShadow,sinkIter->volume,handle,sinkID,volume,ramp,time);
522 if((work=mPool.startWork(worker))==-1)
524 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"));
526 return (E_NOT_POSSIBLE);
529 //save the handle related to the workerID
530 pthread_mutex_lock(&mMapHandleWorkerMutex);
531 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
532 pthread_mutex_unlock(&mMapHandleWorkerMutex);
539 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
541 assert(mReceiveInterface!=0);
542 assert(handle.handle!=0);
543 assert(handle.handleType==H_SETSOURCEVOLUME);
546 //check if we can take the job
551 pthread_mutex_lock(&mSourcesMutex);
552 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
553 for(;sourceIter!=mSources.end();++sourceIter)
555 if(sourceIter->sourceID==sourceID)
561 pthread_mutex_unlock(&mSourcesMutex);
562 if (sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
564 asyncSetSourceVolumeWorker *worker=new asyncSetSourceVolumeWorker(this,&mPool,&mShadow,sourceIter->volume,handle,sourceID,volume,ramp,time);
565 if((work=mPool.startWork(worker))==-1)
567 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"));
569 return (E_NOT_POSSIBLE);
572 //save the handle related to the workerID
573 pthread_mutex_lock(&mMapHandleWorkerMutex);
574 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
575 pthread_mutex_unlock(&mMapHandleWorkerMutex);
582 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
584 assert(mReceiveInterface!=0);
585 assert(handle.handle!=0);
586 assert(handle.handleType==H_SETSOURCESTATE);
589 //check if we can take the job
594 pthread_mutex_lock(&mSourcesMutex);
595 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
596 for(;sourceIter!=mSources.end();++sourceIter)
598 if(sourceIter->sourceID==sourceID)
604 pthread_mutex_unlock(&mSourcesMutex);
605 if (sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
607 asyncSetSourceStateWorker *worker=new asyncSetSourceStateWorker(this,&mPool,&mShadow,handle,sourceID,state);
608 if((work=mPool.startWork(worker))==-1)
610 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
612 return (E_NOT_POSSIBLE);
615 //save the handle related to the workerID
616 pthread_mutex_lock(&mMapHandleWorkerMutex);
617 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
618 pthread_mutex_unlock(&mMapHandleWorkerMutex);
624 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_SoundProperty_s& soundProperty, const am_sinkID_t sinkID)
626 assert(mReceiveInterface!=0);
627 assert(handle.handle!=0);
628 assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
631 //check if we can take the job
636 pthread_mutex_lock(&mSinksMutex);
637 std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
638 for(;sinkIter!=mSinks.end();++sinkIter)
640 if(sinkIter->sinkID==sinkID)
646 pthread_mutex_unlock(&mSinksMutex);
647 if (sinkIter==mSinks.end()) return (E_NON_EXISTENT); //not found!
649 asyncSetSinkSoundPropertyWorker *worker=new asyncSetSinkSoundPropertyWorker(this,&mPool,&mShadow,handle,soundProperty,sinkID);
650 if((work=mPool.startWork(worker))==-1)
652 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"));
654 return (E_NOT_POSSIBLE);
657 //save the handle related to the workerID
658 pthread_mutex_lock(&mMapHandleWorkerMutex);
659 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
660 pthread_mutex_unlock(&mMapHandleWorkerMutex);
666 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
668 //todo: implement crossfader
674 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
676 assert(mReceiveInterface!=0);
679 //check if we can take the job
684 pthread_mutex_lock(&mDomainsMutex);
685 std::vector<am_Domain_s>::iterator domainIter=mDomains.begin();
686 for(;domainIter!=mDomains.end();++domainIter)
688 if(domainIter->domainID==domainID)
694 pthread_mutex_unlock(&mDomainsMutex);
695 if (domainIter==mDomains.end()) return (E_NON_EXISTENT); //not found!
697 asyncDomainStateChangeWorker *worker=new asyncDomainStateChangeWorker(this,&mPool,&mShadow,domainID,domainState);
698 if((work=mPool.startWork(worker))==-1)
700 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::setDomainState not enough threads!"));
702 return (E_NOT_POSSIBLE);
711 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_SoundProperty_s & soundProperty, const am_sourceID_t sourceID)
713 assert(mReceiveInterface!=0);
714 assert(handle.handle!=0);
715 assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
718 //check if we can take the job
723 pthread_mutex_lock(&mSourcesMutex);
724 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
725 for(;sourceIter!=mSources.end();++sourceIter)
727 if(sourceIter->sourceID==sourceID)
733 pthread_mutex_unlock(&mSourcesMutex);
734 if (sourceIter==mSources.end()) return (E_NON_EXISTENT); //not found!
736 asyncSetSourceSoundPropertyWorker *worker=new asyncSetSourceSoundPropertyWorker(this,&mPool,&mShadow,handle,soundProperty,sourceID);
737 if((work=mPool.startWork(worker))==-1)
739 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
741 return (E_NOT_POSSIBLE);
744 //save the handle related to the workerID
745 pthread_mutex_lock(&mMapHandleWorkerMutex);
746 mMapHandleWorker.insert(std::make_pair(handle.handle,work));
747 pthread_mutex_unlock(&mMapHandleWorkerMutex);
754 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
756 BusName="RoutingAsync";
760 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
762 //just write two domains into the table and return it
763 std::vector<am_Domain_s> table;
765 item.busname="RoutingAsync";
768 item.name="AsyncDomain1";
769 item.nodename="AsyncNode1";
770 item.state=DS_CONTROLLED;
771 table.push_back(item);
772 item.busname="RoutingAsync";
775 item.name="AsyncDomain2";
776 item.nodename="AsyncNode2";
777 item.state=DS_CONTROLLED;
778 table.push_back(item);
784 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
786 //create a bunch full of sinks
787 std::vector<am_Sink_s> table;
789 am_SoundProperty_s sp;
792 for (int16_t i=0;i<=10;i++)
794 std::stringstream temp;
796 item.domainID=0; //we cannot know this when the table is created !
797 item.name="mySink" + temp.str();
798 item.sinkID=i; //take fixed ids to make thins easy
801 item.listSoundProperties.push_back(sp);
803 item.listConnectionFormats.push_back(CF_ANALOG);
804 table.push_back(item);
811 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
813 //create a bunch full of sources
814 std::vector<am_Source_s> table;
816 for (int16_t i=0;i<=10;i++)
818 std::stringstream temp;
820 item.domainID=0; //we cannot know this when the table is created !
821 item.name="mySource" + temp.str();
822 item.sourceID=i; //take fixed ids to make thins easy
823 item.sourceClassID=1;
826 item.listConnectionFormats.push_back(CF_ANALOG);
827 table.push_back(item);
832 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
834 pthread_mutex_lock(&mMapConnectionMutex);
835 mMapConnectionIDRoute.insert(std::make_pair(connectionID,route));
836 pthread_mutex_unlock(&mMapConnectionMutex);
839 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
841 pthread_mutex_lock(&mMapHandleWorkerMutex);
842 if (mMapHandleWorker.erase(handle))
844 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeHandle could not remove handle"));
846 pthread_mutex_unlock(&mMapHandleWorkerMutex);
849 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
851 pthread_mutex_lock(&mMapConnectionMutex);
852 if (mMapConnectionIDRoute.erase(connectionID))
854 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeConnectionSafe could not remove connection"));
856 pthread_mutex_unlock(&mMapConnectionMutex);
859 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
861 pthread_mutex_lock(&mSinksMutex);
862 std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
863 for(;sinkIter!=mSinks.end();++sinkIter)
865 if(sinkIter->sinkID==sinkID)
867 sinkIter->volume=volume;
871 pthread_mutex_unlock(&mSinksMutex);
874 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
876 pthread_mutex_lock(&mSourcesMutex);
877 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
878 for(;sourceIter!=mSources.end();++sourceIter)
880 if(sourceIter->sourceID==sourceID)
882 sourceIter->volume=volume;
886 pthread_mutex_unlock(&mSourcesMutex);
889 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
891 pthread_mutex_lock(&mSourcesMutex);
892 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
893 for(;sourceIter!=mSources.end();++sourceIter)
895 if(sourceIter->sourceID==sourceID)
897 sourceIter->sourceState=state;
901 pthread_mutex_unlock(&mSourcesMutex);
904 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
906 pthread_mutex_lock(&mSinksMutex);
907 std::vector<am_Sink_s>::iterator sinkIter=mSinks.begin();
908 for(;sinkIter!=mSinks.end();++sinkIter)
910 if(sinkIter->sinkID==sinkID)
912 std::vector<am_SoundProperty_s>::iterator spIterator=sinkIter->listSoundProperties.begin();
913 for(;spIterator!=sinkIter->listSoundProperties.end();++spIterator)
915 if(spIterator->type==soundProperty.type)
917 spIterator->value=soundProperty.value;
923 pthread_mutex_unlock(&mSinksMutex);
926 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
928 pthread_mutex_lock(&mSourcesMutex);
929 std::vector<am_Source_s>::iterator sourceIter=mSources.begin();
930 for(;sourceIter!=mSources.end();++sourceIter)
932 if(sourceIter->sourceID==sourceID)
934 std::vector<am_SoundProperty_s>::iterator spIterator=sourceIter->listSoundProperties.begin();
935 for(;spIterator!=sourceIter->listSoundProperties.end();++spIterator)
937 if(spIterator->type==soundProperty.type)
939 spIterator->value=soundProperty.value;
945 pthread_mutex_unlock(&mSourcesMutex);
948 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
950 pthread_mutex_lock(&mDomainsMutex);
951 std::vector<am_Domain_s>::iterator domainIter=mDomains.begin();
952 for(;domainIter!=mDomains.end();++domainIter)
954 if(domainIter->domainID==domainID)
956 domainIter->state=domainState;
960 pthread_mutex_unlock(&mDomainsMutex);
963 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
965 std::vector<am_Gateway_s> table;
967 item.name="myGateway";
970 table.push_back(item);
975 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender,WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
977 mAsyncSender(asyncSender),
980 mConnectionID(connectionID),
983 mConnectionFormat(connectionFormat)
987 void asycConnectWorker::start2work()
989 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start connecting"));
994 //do something for one second
995 if (timedWait(t)) return;
996 am_RoutingElement_s route;
997 route.sinkID=mSinkID;
998 route.sourceID=mSourceID;
999 route.connectionFormat=mConnectionFormat;
1001 //enter new connectionID into the list
1002 mAsyncSender->insertConnectionSafe(mConnectionID,route);
1005 mShadow->ackConnect(mHandle,mConnectionID,E_OK);
1007 //destroy the handle
1008 mAsyncSender->removeHandleSafe(mHandle.handle);
1011 void asycConnectWorker::cancelWork()
1013 mAsyncSender->removeHandleSafe(mHandle.handle);
1014 mShadow->ackConnect(mHandle,mConnectionID,E_ABORTED);
1017 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID)
1019 mAsyncSender(asyncSender),
1022 mConnectionID(connectionID)
1027 void asycDisConnectWorker::start2work()
1029 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start disconnecting"));
1034 //do something for one second
1035 if (timedWait(t)) return;
1036 am_RoutingElement_s route;
1038 //enter new connectionID into the list
1039 mAsyncSender->insertConnectionSafe(mConnectionID,route);
1042 mShadow->ackDisconnect(mHandle,mConnectionID,E_OK);
1044 //destroy the handle
1045 mAsyncSender->removeHandleSafe(mHandle.handle);
1051 void asycDisConnectWorker::cancelWork()
1053 mAsyncSender->removeHandleSafe(mHandle.handle);
1054 mShadow->ackDisconnect(mHandle,mConnectionID,E_ABORTED);
1057 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
1059 mAsyncSender(asyncSender),
1061 mCurrentVolume(currentVolume),
1072 void asyncSetSinkVolumeWorker::start2work()
1074 //todo: this implementation does not respect time and ramp....
1075 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting sink volume"));
1080 while (mCurrentVolume!=mVolume)
1082 if (mCurrentVolume<mVolume) mCurrentVolume++;
1083 else mCurrentVolume--;
1084 mShadow->ackSinkVolumeTick(mHandle,mSinkID,mCurrentVolume);
1085 if (timedWait(t)) return;
1088 //enter new connectionID into the list
1089 mAsyncSender->updateSinkVolumeSafe(mSinkID,mCurrentVolume);
1092 mShadow->ackSetSinkVolumeChange(mHandle,mCurrentVolume,E_OK);
1094 //destroy the handle
1095 mAsyncSender->removeHandleSafe(mHandle.handle);
1098 void asyncSetSinkVolumeWorker::cancelWork()
1100 mAsyncSender->updateSinkVolumeSafe(mSinkID,mCurrentVolume);
1101 mAsyncSender->removeHandleSafe(mHandle.handle);
1102 mShadow->ackSetSinkVolumeChange(mHandle,mCurrentVolume,E_ABORTED);
1105 am::asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
1107 mAsyncSender(asyncSender),
1109 mCurrentVolume(currentVolume),
1111 mSourceID(SourceID),
1118 void am::asyncSetSourceVolumeWorker::start2work()
1120 //todo: this implementation does not respect time and ramp....
1121 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source volume"));
1126 while (mCurrentVolume!=mVolume)
1128 if (mCurrentVolume<mVolume) mCurrentVolume++;
1129 else mCurrentVolume--;
1130 mShadow->ackSourceVolumeTick(mHandle,mSourceID,mCurrentVolume);
1131 if (timedWait(t)) return;
1134 //enter new connectionID into the list
1135 mAsyncSender->updateSourceVolumeSafe(mSourceID,mCurrentVolume);
1138 mShadow->ackSetSourceVolumeChange(mHandle,mCurrentVolume,E_OK);
1140 //destroy the handle
1141 mAsyncSender->removeHandleSafe(mHandle.handle);
1146 void am::asyncSetSourceVolumeWorker::cancelWork()
1148 mAsyncSender->updateSourceVolumeSafe(mSourceID,mCurrentVolume);
1149 mAsyncSender->removeHandleSafe(mHandle.handle);
1150 mShadow->ackSetSourceVolumeChange(mHandle,mCurrentVolume,E_ABORTED);
1153 am::asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
1155 mAsyncSender(asyncSender),
1158 mSourceID(sourceID),
1165 void am::asyncSetSourceStateWorker::start2work()
1167 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source state"));
1172 //do something for one second
1173 if (timedWait(t)) return;
1175 //enter new connectionID into the list
1176 mAsyncSender->updateSourceStateSafe(mSourceID,mSourcestate);
1179 mShadow->ackSetSourceState(mHandle,E_OK);
1181 //destroy the handle
1182 mAsyncSender->removeHandleSafe(mHandle.handle);
1187 void am::asyncSetSourceStateWorker::cancelWork()
1190 mShadow->ackSetSourceState(mHandle,E_ABORTED);
1192 //destroy the handle
1193 mAsyncSender->removeHandleSafe(mHandle.handle);
1196 am::asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle,const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID)
1198 mAsyncSender(asyncSender),
1202 mSoundProperty(soundProperty)
1208 void am::asyncSetSinkSoundPropertyWorker::start2work()
1210 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting sink sound property"));
1215 //do something for one second
1216 if (timedWait(t)) return;
1218 //enter new connectionID into the list
1219 mAsyncSender->updateSinkSoundPropertySafe(mSinkID,mSoundProperty);
1222 mShadow->ackSetSinkSoundProperty(mHandle,E_OK);
1224 //destroy the handle
1225 mAsyncSender->removeHandleSafe(mHandle.handle);
1230 void am::asyncSetSinkSoundPropertyWorker::cancelWork()
1233 mShadow->ackSetSinkSoundProperty(mHandle,E_OK);
1235 //destroy the handle
1236 mAsyncSender->removeHandleSafe(mHandle.handle);
1239 am::asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID)
1241 mAsyncSender(asyncSender),
1244 mSourceID(sourceID),
1245 mSoundProperty(soundProperty)
1251 void am::asyncSetSourceSoundPropertyWorker::start2work()
1253 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1258 //do something for one second
1259 if (timedWait(t)) return;
1261 //enter new connectionID into the list
1262 mAsyncSender->updateSourceSoundPropertySafe(mSourceID,mSoundProperty);
1265 mShadow->ackSetSourceSoundProperty(mHandle,E_OK);
1267 //destroy the handle
1268 mAsyncSender->removeHandleSafe(mHandle.handle);
1273 void am::asyncSetSourceSoundPropertyWorker::cancelWork()
1276 mShadow->ackSetSourceSoundProperty(mHandle,E_OK);
1278 //destroy the handle
1279 mAsyncSender->removeHandleSafe(mHandle.handle);
1282 am::asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState)
1284 mAsyncSender(asyncSender),
1286 mDomainID(domainID),
1287 mDomainState(domainState)
1291 void am::asyncDomainStateChangeWorker::start2work()
1293 //todo: sendchanged data must be in here !
1294 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1300 //do something for one second
1301 if (timedWait(t)) return;
1303 //enter new connectionID into the list
1304 mAsyncSender->updateDomainstateSafe(mDomainID,mDomainState);
1305 mShadow->hookDomainStateChange(mDomainID,mDomainState);
1306 //send the new status
1312 void am::asyncDomainStateChangeWorker::cancelWork()
1314 //send the new status
1315 mShadow->hookDomainStateChange(mDomainID,mDomainState);