2 * Copyright (C) 2011, BMW AG
4 * GeniviAudioMananger DbusPlugin
6 * \file RoutingSender.cpp
8 * \date 20-Oct-2011 3:42:04 PM
9 * \author Christian Mueller (christian.ei.mueller@bmw.de)
12 * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13 * Copyright (C) 2011, BMW AG Christian Mueller Christian.ei.mueller@bmw.de
15 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17 * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18 * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19 * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20 * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21 * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
25 #include "RoutingSenderAsyn.h"
26 #include "DLTWrapper.h"
35 #include <dbus/dbus.h>
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory()
43 return (new AsyncRoutingSender());
46 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface)
48 delete routingSendInterface;
51 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
55 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
56 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
58 void* AsyncRoutingSender::InterruptEvents(void *data)
60 RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
64 dbus_error_init(&err);
65 conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
66 dbus_uint32_t serial = 0;
69 dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
71 while (dbus_connection_read_write_dispatch(conn, -1))
73 dbus_connection_read_write(conn, 0);
74 msg = dbus_connection_pop_message(conn);
76 if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
78 am_connectionID_t connectionID;
80 dbus_message_iter_init(msg, &args);
81 dbus_message_iter_get_basic(&args,(void*) &connectionID);
82 dbus_message_iter_next(&args);
83 dbus_message_iter_get_basic(&args,(void*) &delay);
84 reply = dbus_message_new_method_return(msg);
85 dbus_message_iter_init_append(reply, &args);
86 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
87 dbus_connection_send(conn, reply, &serial);
88 shadow->hookTimingInformationChanged(connectionID,delay);
89 dbus_message_unref(reply);
91 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
94 am_Availability_s availability;
95 dbus_message_iter_init(msg, &args);
96 dbus_message_iter_get_basic(&args,(void*) &sinkID);
97 reply = dbus_message_new_method_return(msg);
98 dbus_message_iter_init_append(reply, &args);
99 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
100 dbus_connection_send(conn, reply, &serial);
101 shadow->hookSinkAvailablityStatusChange(sinkID,availability);
102 dbus_message_unref(reply);
104 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
106 am_sourceID_t sourceID;
107 am_Availability_s availability;
108 dbus_message_iter_init(msg, &args);
109 dbus_message_iter_get_basic(&args,(void*) &sourceID);
110 reply = dbus_message_new_method_return(msg);
111 dbus_message_iter_init_append(reply, &args);
112 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
113 dbus_connection_send(conn, reply, &serial);
114 shadow->hookSourceAvailablityStatusChange(sourceID,availability);
115 dbus_message_unref(reply);
117 else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
119 am_sourceID_t sourceID;
121 am_InterruptState_e state=IS_MIN;
122 dbus_message_iter_init(msg, &args);
123 dbus_message_iter_get_basic(&args,(void*) &sourceID);
124 reply = dbus_message_new_method_return(msg);
125 dbus_message_iter_init_append(reply, &args);
126 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
127 dbus_connection_send(conn, reply, &serial);
128 shadow->hookInterruptStatusChange(sourceID,state);
129 dbus_message_unref(reply);
131 dbus_connection_flush(conn);
136 void *WorkerThreadPool::WorkerThread(void* data)
138 threadInfo_s *myInfo=(threadInfo_s*)data;
141 sem_wait(&myInfo->block);
142 pthread_mutex_lock(&mBlockingMutex);
143 Worker* actWorker=myInfo->worker;
144 pthread_mutex_unlock(&mBlockingMutex);
145 actWorker->setCancelSempaphore(&myInfo->cancel);
146 actWorker->start2work();
147 actWorker->pPool->finishedWork(myInfo->threadID);
152 WorkerThreadPool::WorkerThreadPool(int numThreads):
153 mNumThreads(numThreads)
156 mListWorkers.resize(mNumThreads);
157 for (int i=0;i<mNumThreads;i++)
159 sem_init(&mListWorkers[i].block,NULL,NULL);
160 sem_init(&mListWorkers[i].cancel,NULL,NULL);
161 mListWorkers[i].busy=false;
162 mListWorkers[i].workerID=++workerID;
163 pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
167 int16_t WorkerThreadPool::startWork(Worker *worker)
169 pthread_mutex_lock(&mBlockingMutex);
170 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
171 for(;it!=mListWorkers.end();++it)
177 pthread_mutex_unlock(&mBlockingMutex);
178 sem_post(&it->block);
179 return ((int)it->workerID);
182 pthread_mutex_unlock(&mBlockingMutex);
186 bool WorkerThreadPool::cancelWork(int workerID)
188 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
189 for(;it!=mListWorkers.end();++it)
191 if(it->workerID==workerID && it->busy)
193 sem_post(&it->cancel);
200 void WorkerThreadPool::finishedWork(pthread_t threadID)
202 pthread_mutex_lock(&mBlockingMutex);
203 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
204 for(;it!=mListWorkers.end();++it)
206 if(it->threadID==threadID)
213 pthread_mutex_unlock(&mBlockingMutex);
216 WorkerThreadPool::~WorkerThreadPool()
218 for (int i=0;i<mNumThreads;i++)
220 pthread_cancel(mListWorkers[i].threadID);
224 Worker::Worker(WorkerThreadPool *pool):
230 void Worker::setCancelSempaphore(sem_t* cancel)
235 bool Worker::timedWait(timespec timer)
238 if(clock_gettime(0, &temp)==-1)
240 logError("Worker::timedWait error on getting time");
242 temp.tv_nsec+=timer.tv_nsec;
243 temp.tv_sec+=timer.tv_sec;
244 //if(sem_wait(mCancelSem)==-1)
245 if (sem_timedwait(mCancelSem,&temp)==-1)
248 if(errno == ETIMEDOUT)
250 logError("Worker::timedWait timeout waiting error");
253 else //failure in waiting, nevertheless, we quit the thread...
255 logError("Worker::timedWait semaphore waiting error");
259 logError("Worker::timedWait semaphore waiting error");
264 AsyncRoutingSender::AsyncRoutingSender():
266 mReceiveInterface(0), //
267 mDomains(createDomainTable()), //
268 mSinks(createSinkTable()), //
269 mSources ( createSourceTable ( ) ), //
270 mGateways ( createGatewayTable ( ) ) , //
271 mMapHandleWorker ( ), //
272 mMapConnectionIDRoute(),//
277 AsyncRoutingSender::~AsyncRoutingSender()
281 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
283 //first, create the Shadow:
284 assert(routingreceiveinterface!=0);
285 mReceiveInterface = routingreceiveinterface;
286 mShadow.setRoutingInterface(routingreceiveinterface);
289 void AsyncRoutingSender::routingInterfacesReady()
291 assert(mReceiveInterface!=0);
293 //first register the domains
294 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
295 for (; domainIter != mDomains.end(); ++domainIter)
297 am_domainID_t domainID;
298 if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
300 logError("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with", eCode);
302 domainIter->domainID = domainID;
306 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
307 for (; sourceIter != mSources.end(); ++sourceIter)
309 am_sourceID_t sourceID;
310 //set the correct domainID
311 sourceIter->domainID = mDomains[0].domainID;
312 if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
314 logError("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with", eCode);
319 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
320 for (; sinkIter != mSinks.end(); ++sinkIter)
323 //set the correct domainID
324 sinkIter->domainID = mDomains[0].domainID;
325 if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
327 logError("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with", eCode);
332 // std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
333 // for(;gatewayIter!=mGateways.end();++gatewayIter)
335 // am_gatewayID_t gatewayID;
336 // gatewayIter->domainSinkID=mDomains[0].domainID;
337 // gatewayIter->domainSourceID=mDomains[1].domainID;
338 // gatewayIter->controlDomainID=mDomains[0].domainID;
339 // if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
341 // logError("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with",eCode));
343 // gatewayIter->gatewayID=gatewayID;
346 //create thread for interrupts, but only if we are testing - otherwise we get 100% cpu load:
347 //todo: find a solution for the 100% dbus load to uncomment this and make interrupt tests work
348 //pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
351 void AsyncRoutingSender::routingInterfacesRundown()
353 assert(mReceiveInterface!=0);
356 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
358 assert(mReceiveInterface!=0);
359 assert(handle.handle!=0);
361 //first check if we know the handle
362 pthread_mutex_lock(&mMapHandleWorkerMutex);
363 std::map<uint16_t, int16_t>::iterator iter = mMapHandleWorker.begin();
364 if (mMapHandleWorker.find(handle.handle) == mMapHandleWorker.end())
366 pthread_mutex_unlock(&mMapHandleWorkerMutex);
367 return (E_NON_EXISTENT);
369 pthread_mutex_unlock(&mMapHandleWorkerMutex);
371 //ok, cancel the action:
372 if (mPool.cancelWork(iter->second))
377 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
379 assert(mReceiveInterface!=0);
380 assert(handle.handle!=0);
381 assert(handle.handleType==H_CONNECT);
382 assert(connectionID!=0);
386 //check if we can take the job
392 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
393 for (; sinkIter != mSinks.end(); ++sinkIter)
395 if (sinkIter->sinkID == sinkID)
401 if (sinkIter == mSinks.end())
402 return (E_NON_EXISTENT); //not found!
405 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
406 for (; sourceIter != mSources.end(); ++sourceIter)
408 if (sourceIter->sourceID == sourceID)
410 source = *sourceIter;
414 if (sourceIter == mSources.end())
415 return (E_NON_EXISTENT); //not found!
418 if (std::find(source.listConnectionFormats.begin(), source.listConnectionFormats.end(), connectionFormat) == source.listConnectionFormats.end())
419 return (E_WRONG_FORMAT);
420 if (std::find(sink.listConnectionFormats.begin(), sink.listConnectionFormats.end(), connectionFormat) == sink.listConnectionFormats.end())
421 return (E_WRONG_FORMAT);
423 //the operation is ok, lets create a worker, assign it to a task in the task pool
424 asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
425 if ((work = mPool.startWork(worker)) == -1)
427 logError("AsyncRoutingSender::asyncConnect not enough threads!");
429 return (E_NOT_POSSIBLE);
432 //save the handle related to the workerID
433 pthread_mutex_lock(&mMapHandleWorkerMutex);
434 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
435 pthread_mutex_unlock(&mMapHandleWorkerMutex);
440 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
442 assert(mReceiveInterface!=0);
443 assert(handle.handle!=0);
444 assert(handle.handleType==H_DISCONNECT);
445 assert(connectionID!=0);
447 //check if we can take the job
450 pthread_mutex_lock(&mMapConnectionMutex);
451 if (mMapConnectionIDRoute.find(connectionID) == mMapConnectionIDRoute.end())
453 pthread_mutex_unlock(&mMapConnectionMutex);
454 return (E_NON_EXISTENT);
456 pthread_mutex_unlock(&mMapConnectionMutex);
458 //the operation is ok, lets create a worker, assign it to a task in the task pool
459 asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
460 if ((work = mPool.startWork(worker)) == -1)
462 logError("AsyncRoutingSender::asyncDisconnect not enough threads!");
464 return (E_NOT_POSSIBLE);
467 //save the handle related to the workerID
468 pthread_mutex_lock(&mMapHandleWorkerMutex);
469 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
470 pthread_mutex_unlock(&mMapHandleWorkerMutex);
475 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
477 assert(mReceiveInterface!=0);
478 assert(handle.handle!=0);
479 assert(handle.handleType==H_SETSINKVOLUME);
482 //check if we can take the job
487 pthread_mutex_lock(&mSinksMutex);
488 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
489 for (; sinkIter != mSinks.end(); ++sinkIter)
491 if (sinkIter->sinkID == sinkID)
497 pthread_mutex_unlock(&mSinksMutex);
498 if (sinkIter == mSinks.end())
499 return (E_NON_EXISTENT); //not found!
501 asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
502 if ((work = mPool.startWork(worker)) == -1)
504 logError("AsyncRoutingSender::asyncSetSinkVolume not enough threads!");
506 return (E_NOT_POSSIBLE);
509 //save the handle related to the workerID
510 pthread_mutex_lock(&mMapHandleWorkerMutex);
511 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
512 pthread_mutex_unlock(&mMapHandleWorkerMutex);
517 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
519 assert(mReceiveInterface!=0);
520 assert(handle.handle!=0);
521 assert(handle.handleType==H_SETSOURCEVOLUME);
524 //check if we can take the job
529 pthread_mutex_lock(&mSourcesMutex);
530 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
531 for (; sourceIter != mSources.end(); ++sourceIter)
533 if (sourceIter->sourceID == sourceID)
535 source = *sourceIter;
539 pthread_mutex_unlock(&mSourcesMutex);
540 if (sourceIter == mSources.end())
541 return (E_NON_EXISTENT); //not found!
543 asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
544 if ((work = mPool.startWork(worker)) == -1)
546 logError("AsyncRoutingSender::asyncSetSourceVolume not enough threads!");
548 return (E_NOT_POSSIBLE);
551 //save the handle related to the workerID
552 pthread_mutex_lock(&mMapHandleWorkerMutex);
553 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
554 pthread_mutex_unlock(&mMapHandleWorkerMutex);
559 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
561 assert(mReceiveInterface!=0);
562 assert(handle.handle!=0);
563 assert(handle.handleType==H_SETSOURCESTATE);
566 //check if we can take the job
571 pthread_mutex_lock(&mSourcesMutex);
572 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
573 for (; sourceIter != mSources.end(); ++sourceIter)
575 if (sourceIter->sourceID == sourceID)
577 source = *sourceIter;
581 pthread_mutex_unlock(&mSourcesMutex);
582 if (sourceIter == mSources.end())
583 return (E_NON_EXISTENT); //not found!
585 asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
586 if ((work = mPool.startWork(worker)) == -1)
588 logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
590 return (E_NOT_POSSIBLE);
593 //save the handle related to the workerID
594 pthread_mutex_lock(&mMapHandleWorkerMutex);
595 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
596 pthread_mutex_unlock(&mMapHandleWorkerMutex);
601 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_sinkID_t sinkID, const am_SoundProperty_s & soundProperty)
603 assert(mReceiveInterface!=0);
604 assert(handle.handle!=0);
605 assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
608 //check if we can take the job
613 pthread_mutex_lock(&mSinksMutex);
614 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
615 for (; sinkIter != mSinks.end(); ++sinkIter)
617 if (sinkIter->sinkID == sinkID)
623 pthread_mutex_unlock(&mSinksMutex);
624 if (sinkIter == mSinks.end())
625 return (E_NON_EXISTENT); //not found!
627 asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
628 if ((work = mPool.startWork(worker)) == -1)
630 logError("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!");
632 return (E_NOT_POSSIBLE);
635 //save the handle related to the workerID
636 pthread_mutex_lock(&mMapHandleWorkerMutex);
637 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
638 pthread_mutex_unlock(&mMapHandleWorkerMutex);
643 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
645 //todo: implement crossfader
654 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
656 assert(mReceiveInterface!=0);
659 //check if we can take the job
664 pthread_mutex_lock(&mDomainsMutex);
665 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
666 for (; domainIter != mDomains.end(); ++domainIter)
668 if (domainIter->domainID == domainID)
670 domain = *domainIter;
674 pthread_mutex_unlock(&mDomainsMutex);
675 if (domainIter == mDomains.end())
676 return (E_NON_EXISTENT); //not found!
678 asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
679 if ((work = mPool.startWork(worker)) == -1)
681 logError("AsyncRoutingSender::setDomainState not enough threads!");
683 return (E_NOT_POSSIBLE);
690 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SoundProperty_s & soundProperty)
692 assert(mReceiveInterface!=0);
693 assert(handle.handle!=0);
694 assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
697 //check if we can take the job
702 pthread_mutex_lock(&mSourcesMutex);
703 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
704 for (; sourceIter != mSources.end(); ++sourceIter)
706 if (sourceIter->sourceID == sourceID)
708 source = *sourceIter;
712 pthread_mutex_unlock(&mSourcesMutex);
713 if (sourceIter == mSources.end())
714 return (E_NON_EXISTENT); //not found!
716 asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
717 if ((work = mPool.startWork(worker)) == -1)
719 logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
721 return (E_NOT_POSSIBLE);
724 //save the handle related to the workerID
725 pthread_mutex_lock(&mMapHandleWorkerMutex);
726 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
727 pthread_mutex_unlock(&mMapHandleWorkerMutex);
732 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
734 BusName = "RoutingAsync";
738 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
740 //just write two domains into the table and return it
741 std::vector<am_Domain_s> table;
743 item.busname = "RoutingAsync";
746 item.name = "AsyncDomain1";
747 item.nodename = "AsyncNode1";
748 item.state = DS_CONTROLLED;
749 table.push_back(item);
750 item.busname = "RoutingAsync";
753 item.name = "AsyncDomain2";
754 item.nodename = "AsyncNode2";
755 item.state = DS_CONTROLLED;
756 table.push_back(item);
760 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
762 //create a bunch full of sinks
763 std::vector<am_Sink_s> table;
765 am_SoundProperty_s sp;
768 for (int16_t i = 0; i <= 10; i++)
770 std::stringstream temp;
772 item.domainID = 0; //we cannot know this when the table is created !
773 item.name = "mySink" + temp.str();
774 item.sinkID = i; //take fixed ids to make thins easy
775 item.sinkClassID = 1;
777 item.listSoundProperties.push_back(sp);
779 item.listConnectionFormats.push_back(CF_ANALOG);
780 table.push_back(item);
785 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
787 //create a bunch full of sources
788 std::vector<am_Source_s> table;
790 for (int16_t i = 0; i <= 10; i++)
792 std::stringstream temp;
794 item.domainID = 0; //we cannot know this when the table is created !
795 item.name = "mySource" + temp.str();
796 item.sourceID = i; //take fixed ids to make thins easy
797 item.sourceClassID = 1;
800 item.listConnectionFormats.push_back(CF_ANALOG);
801 table.push_back(item);
806 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
808 pthread_mutex_lock(&mMapConnectionMutex);
809 mMapConnectionIDRoute.insert(std::make_pair(connectionID, route));
810 pthread_mutex_unlock(&mMapConnectionMutex);
813 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
815 pthread_mutex_lock(&mMapHandleWorkerMutex);
816 if (mMapHandleWorker.erase(handle))
818 logError("AsyncRoutingSender::removeHandle could not remove handle");
820 pthread_mutex_unlock(&mMapHandleWorkerMutex);
823 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
825 pthread_mutex_lock(&mMapConnectionMutex);
826 if (mMapConnectionIDRoute.erase(connectionID))
828 logError("AsyncRoutingSender::removeConnectionSafe could not remove connection");
830 pthread_mutex_unlock(&mMapConnectionMutex);
833 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
835 pthread_mutex_lock(&mSinksMutex);
836 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
837 for (; sinkIter != mSinks.end(); ++sinkIter)
839 if (sinkIter->sinkID == sinkID)
841 sinkIter->volume = volume;
845 pthread_mutex_unlock(&mSinksMutex);
848 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
850 pthread_mutex_lock(&mSourcesMutex);
851 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
852 for (; sourceIter != mSources.end(); ++sourceIter)
854 if (sourceIter->sourceID == sourceID)
856 sourceIter->volume = volume;
860 pthread_mutex_unlock(&mSourcesMutex);
863 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
865 pthread_mutex_lock(&mSourcesMutex);
866 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
867 for (; sourceIter != mSources.end(); ++sourceIter)
869 if (sourceIter->sourceID == sourceID)
871 sourceIter->sourceState = state;
875 pthread_mutex_unlock(&mSourcesMutex);
878 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
880 pthread_mutex_lock(&mSinksMutex);
881 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
882 for (; sinkIter != mSinks.end(); ++sinkIter)
884 if (sinkIter->sinkID == sinkID)
886 std::vector<am_SoundProperty_s>::iterator spIterator = sinkIter->listSoundProperties.begin();
887 for (; spIterator != sinkIter->listSoundProperties.end(); ++spIterator)
889 if (spIterator->type == soundProperty.type)
891 spIterator->value = soundProperty.value;
897 pthread_mutex_unlock(&mSinksMutex);
900 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
902 pthread_mutex_lock(&mSourcesMutex);
903 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
904 for (; sourceIter != mSources.end(); ++sourceIter)
906 if (sourceIter->sourceID == sourceID)
908 std::vector<am_SoundProperty_s>::iterator spIterator = sourceIter->listSoundProperties.begin();
909 for (; spIterator != sourceIter->listSoundProperties.end(); ++spIterator)
911 if (spIterator->type == soundProperty.type)
913 spIterator->value = soundProperty.value;
919 pthread_mutex_unlock(&mSourcesMutex);
922 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
924 pthread_mutex_lock(&mDomainsMutex);
925 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
926 for (; domainIter != mDomains.end(); ++domainIter)
928 if (domainIter->domainID == domainID)
930 domainIter->state = domainState;
934 pthread_mutex_unlock(&mDomainsMutex);
937 uint16_t AsyncRoutingSender::getInterfaceVersion() const
939 return (RoutingSendVersion);
942 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperties(const am_Handle_s handle, const am_sourceID_t sourceID, const std::vector<am_SoundProperty_s> & listSoundProperties)
947 (void) listSoundProperties;
951 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperties(const am_Handle_s handle, const am_sinkID_t sinkID, const std::vector<am_SoundProperty_s> & listSoundProperties)
956 (void) listSoundProperties;
960 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
962 std::vector<am_Gateway_s> table;
964 item.name = "myGateway";
967 table.push_back(item);
971 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat) :
973 mAsyncSender(asyncSender), //
976 mConnectionID(connectionID), //
977 mSourceID(sourceID), //
979 mConnectionFormat(connectionFormat)
983 void asycConnectWorker::start2work()
985 logInfo("Start connecting");
990 //do something for one second
993 am_RoutingElement_s route;
994 route.sinkID = mSinkID;
995 route.sourceID = mSourceID;
996 route.connectionFormat = mConnectionFormat;
998 //enter new connectionID into the list
999 mAsyncSender->insertConnectionSafe(mConnectionID, route);
1002 mShadow->ackConnect(mHandle, mConnectionID, E_OK);
1004 //destroy the handle
1005 mAsyncSender->removeHandleSafe(mHandle.handle);
1008 void asycConnectWorker::cancelWork()
1010 mAsyncSender->removeHandleSafe(mHandle.handle);
1011 mShadow->ackConnect(mHandle, mConnectionID, E_ABORTED);
1014 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID) :
1016 mAsyncSender(asyncSender), //
1019 mConnectionID(connectionID)
1023 void asycDisConnectWorker::start2work()
1025 logInfo("Start disconnecting");
1030 //do something for one second
1033 am_RoutingElement_s route;
1035 //enter new connectionID into the list
1036 mAsyncSender->insertConnectionSafe(mConnectionID, route);
1039 mShadow->ackDisconnect(mHandle, mConnectionID, E_OK);
1041 //destroy the handle
1042 mAsyncSender->removeHandleSafe(mHandle.handle);
1046 void asycDisConnectWorker::cancelWork()
1048 mAsyncSender->removeHandleSafe(mHandle.handle);
1049 mShadow->ackDisconnect(mHandle, mConnectionID, E_ABORTED);
1052 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1054 mAsyncSender(asyncSender), //
1056 mCurrentVolume(currentVolume), //
1065 void asyncSetSinkVolumeWorker::start2work()
1067 //todo: this implementation does not respect time and ramp....
1068 logInfo("Start setting sink volume");
1070 t.tv_nsec = 10000000;
1073 while (mCurrentVolume != mVolume)
1075 if (mCurrentVolume < mVolume)
1079 mShadow->ackSinkVolumeTick(mHandle, mSinkID, mCurrentVolume);
1084 //enter new connectionID into the list
1085 mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1088 mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_OK);
1090 //destroy the handle
1091 mAsyncSender->removeHandleSafe(mHandle.handle);
1094 void asyncSetSinkVolumeWorker::cancelWork()
1096 mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1097 mAsyncSender->removeHandleSafe(mHandle.handle);
1098 mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1101 asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1103 mAsyncSender(asyncSender), //
1105 mCurrentVolume(currentVolume), //
1107 mSourceID(SourceID), //
1114 void asyncSetSourceVolumeWorker::start2work()
1116 //todo: this implementation does not respect time and ramp....
1117 logInfo("Start setting source volume");
1119 t.tv_nsec = 10000000;
1122 while (mCurrentVolume != mVolume)
1124 if (mCurrentVolume < mVolume)
1128 mShadow->ackSourceVolumeTick(mHandle, mSourceID, mCurrentVolume);
1133 //enter new connectionID into the list
1134 mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1137 mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_OK);
1139 //destroy the handle
1140 mAsyncSender->removeHandleSafe(mHandle.handle);
1143 void asyncSetSourceVolumeWorker::cancelWork()
1145 mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1146 mAsyncSender->removeHandleSafe(mHandle.handle);
1147 mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1150 asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state) :
1152 mAsyncSender(asyncSender), //
1155 mSourceID(sourceID), //
1160 void asyncSetSourceStateWorker::start2work()
1162 logInfo("Start setting source state");
1167 //do something for one second
1171 //enter new connectionID into the list
1172 mAsyncSender->updateSourceStateSafe(mSourceID, mSourcestate);
1175 mShadow->ackSetSourceState(mHandle, E_OK);
1177 //destroy the handle
1178 mAsyncSender->removeHandleSafe(mHandle.handle);
1181 void asyncSetSourceStateWorker::cancelWork()
1184 mShadow->ackSetSourceState(mHandle, E_ABORTED);
1186 //destroy the handle
1187 mAsyncSender->removeHandleSafe(mHandle.handle);
1190 asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID) :
1192 mAsyncSender(asyncSender), //
1196 mSoundProperty(soundProperty)
1200 void asyncSetSinkSoundPropertyWorker::start2work()
1202 logInfo("Start setting sink sound property");
1207 //do something for one second
1211 //enter new connectionID into the list
1212 mAsyncSender->updateSinkSoundPropertySafe(mSinkID, mSoundProperty);
1215 mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1217 //destroy the handle
1218 mAsyncSender->removeHandleSafe(mHandle.handle);
1221 void asyncSetSinkSoundPropertyWorker::cancelWork()
1224 mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1226 //destroy the handle
1227 mAsyncSender->removeHandleSafe(mHandle.handle);
1230 asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID) :
1232 mAsyncSender(asyncSender), //
1235 mSourceID(sourceID), //
1236 mSoundProperty(soundProperty)
1240 void asyncSetSourceSoundPropertyWorker::start2work()
1242 logInfo("Start setting source sound property");
1247 //do something for one second
1251 //enter new connectionID into the list
1252 mAsyncSender->updateSourceSoundPropertySafe(mSourceID, mSoundProperty);
1255 mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1257 //destroy the handle
1258 mAsyncSender->removeHandleSafe(mHandle.handle);
1261 void asyncSetSourceSoundPropertyWorker::cancelWork()
1264 mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1266 //destroy the handle
1267 mAsyncSender->removeHandleSafe(mHandle.handle);
1270 asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState) :
1272 mAsyncSender(asyncSender), //
1274 mDomainID(domainID), //
1275 mDomainState(domainState)
1279 void asyncDomainStateChangeWorker::start2work()
1281 //todo: sendchanged data must be in here !
1282 logInfo("Start setting source sound property");
1287 //do something for one second
1291 //enter new connectionID into the list
1292 mAsyncSender->updateDomainstateSafe(mDomainID, mDomainState);
1293 mShadow->hookDomainStateChange(mDomainID, mDomainState);
1294 //send the new status
1298 void am::asyncDomainStateChangeWorker::cancelWork()
1300 //send the new status
1301 mShadow->hookDomainStateChange(mDomainID, mDomainState);