2 * Copyright (C) 2011, BMW AG
4 * GeniviAudioMananger DbusPlugin
6 * \file RoutingSender.cpp
8 * \date 20-Oct-2011 3:42:04 PM
9 * \author Christian Mueller (christian.ei.mueller@bmw.de)
12 * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13 * Copyright (C) 2011, BMW AG Christian Mueller Christian.ei.mueller@bmw.de
15 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17 * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18 * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19 * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20 * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21 * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
25 #include "RoutingSenderAsyn.h"
26 #include "DltContext.h"
35 #include <dbus/dbus.h>
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory()
43 return (new AsyncRoutingSender());
46 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface)
48 delete routingSendInterface;
51 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
55 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
56 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
58 void* AsyncRoutingSender::InterruptEvents(void *data)
60 //This is a very very very basic implementation of the dbus interface
61 //there is not failure handling, nothing.
62 //it is used just for testing, not intended to be used otherwise...
63 RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
67 dbus_error_init(&err);
68 conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
69 dbus_uint32_t serial = 0;
72 int answer = dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
74 while (dbus_connection_read_write_dispatch(conn, -1))
76 dbus_connection_read_write(conn, 0);
77 msg = dbus_connection_pop_message(conn);
79 if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
81 am_connectionID_t connectionID;
83 dbus_message_iter_init(msg, &args);
84 dbus_message_iter_get_basic(&args,(void*) &connectionID);
85 dbus_message_iter_next(&args);
86 dbus_message_iter_get_basic(&args,(void*) &delay);
87 reply = dbus_message_new_method_return(msg);
88 dbus_message_iter_init_append(reply, &args);
89 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
90 dbus_connection_send(conn, reply, &serial);
91 shadow->hookTimingInformationChanged(connectionID,delay);
92 dbus_message_unref(reply);
94 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
98 am_Availability_s availability;
99 dbus_message_iter_init(msg, &args);
100 dbus_message_iter_get_basic(&args,(void*) &sinkID);
101 reply = dbus_message_new_method_return(msg);
102 dbus_message_iter_init_append(reply, &args);
103 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
104 dbus_connection_send(conn, reply, &serial);
105 shadow->hookSinkAvailablityStatusChange(sinkID,availability);
106 dbus_message_unref(reply);
108 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
110 am_sourceID_t sourceID;
112 am_Availability_s availability;
113 dbus_message_iter_init(msg, &args);
114 dbus_message_iter_get_basic(&args,(void*) &sourceID);
115 reply = dbus_message_new_method_return(msg);
116 dbus_message_iter_init_append(reply, &args);
117 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
118 dbus_connection_send(conn, reply, &serial);
119 shadow->hookSourceAvailablityStatusChange(sourceID,availability);
120 dbus_message_unref(reply);
122 else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
124 am_sourceID_t sourceID;
126 am_InterruptState_e state;
127 dbus_message_iter_init(msg, &args);
128 dbus_message_iter_get_basic(&args,(void*) &sourceID);
129 reply = dbus_message_new_method_return(msg);
130 dbus_message_iter_init_append(reply, &args);
131 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
132 dbus_connection_send(conn, reply, &serial);
133 shadow->hookInterruptStatusChange(sourceID,state);
134 dbus_message_unref(reply);
136 dbus_connection_flush(conn);
140 void *WorkerThreadPool::WorkerThread(void* data)
142 threadInfo_s *myInfo=(threadInfo_s*)data;
145 sem_wait(&myInfo->block);
146 pthread_mutex_lock(&mBlockingMutex);
147 Worker* actWorker=myInfo->worker;
148 pthread_mutex_unlock(&mBlockingMutex);
149 actWorker->setCancelSempaphore(&myInfo->cancel);
150 actWorker->start2work();
151 actWorker->pPool->finishedWork(myInfo->threadID);
155 WorkerThreadPool::WorkerThreadPool(int numThreads):
156 mNumThreads(numThreads)
159 mListWorkers.resize(mNumThreads);
160 for (int i=0;i<mNumThreads;i++)
162 sem_init(&mListWorkers[i].block,NULL,NULL);
163 sem_init(&mListWorkers[i].cancel,NULL,NULL);
164 mListWorkers[i].busy=false;
165 mListWorkers[i].workerID=++workerID;
166 pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
170 int16_t WorkerThreadPool::startWork(Worker *worker)
172 pthread_mutex_lock(&mBlockingMutex);
173 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
174 for(;it!=mListWorkers.end();++it)
180 pthread_mutex_unlock(&mBlockingMutex);
181 sem_post(&it->block);
182 return ((int)it->workerID);
185 pthread_mutex_unlock(&mBlockingMutex);
189 bool WorkerThreadPool::cancelWork(int workerID)
191 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
192 for(;it!=mListWorkers.end();++it)
194 if(it->workerID==workerID && it->busy)
196 sem_post(&it->cancel);
203 void WorkerThreadPool::finishedWork(pthread_t threadID)
205 pthread_mutex_lock(&mBlockingMutex);
206 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
207 for(;it!=mListWorkers.end();++it)
209 if(it->threadID==threadID)
216 pthread_mutex_unlock(&mBlockingMutex);
219 WorkerThreadPool::~WorkerThreadPool()
221 for (int i=0;i<mNumThreads;i++)
223 pthread_cancel(mListWorkers[i].threadID);
227 Worker::Worker(WorkerThreadPool *pool):
233 void Worker::setCancelSempaphore(sem_t* cancel)
238 bool Worker::timedWait(timespec timer)
241 if(clock_gettime(0, &temp)==-1)
243 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait error on getting time"));
245 temp.tv_nsec+=timer.tv_nsec;
246 temp.tv_sec+=timer.tv_sec;
247 //if(sem_wait(mCancelSem)==-1)
248 if (sem_timedwait(mCancelSem,&temp)==-1)
251 if(errno == ETIMEDOUT)
253 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait timeout waiting error"));
256 else //failure in waiting, nevertheless, we quit the thread...
258 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
262 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
267 AsyncRoutingSender::AsyncRoutingSender():
269 mReceiveInterface(0), //
270 mDomains(createDomainTable()), //
271 mSinks(createSinkTable()), //
272 mSources ( createSourceTable ( ) ), //
273 mGateways ( createGatewayTable ( ) ) , //
274 mMapHandleWorker ( ), //
275 mMapConnectionIDRoute(),//
278 DLT_REGISTER_CONTEXT(PluginRoutingAsync,"ASY","Async Plugin");
279 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("AsyncRoutingSender constructed"));
282 AsyncRoutingSender::~AsyncRoutingSender()
286 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
288 //first, create the Shadow:
289 assert(routingreceiveinterface!=0);
290 mReceiveInterface = routingreceiveinterface;
291 mShadow.setRoutingInterface(routingreceiveinterface);
294 void AsyncRoutingSender::routingInterfacesReady()
296 assert(mReceiveInterface!=0);
298 //first register the domains
299 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
300 for (; domainIter != mDomains.end(); ++domainIter)
302 am_domainID_t domainID;
303 if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
305 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with"), DLT_INT(eCode));
307 domainIter->domainID = domainID;
311 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
312 for (; sourceIter != mSources.end(); ++sourceIter)
314 am_sourceID_t sourceID;
315 //set the correct domainID
316 sourceIter->domainID = mDomains[0].domainID;
317 if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
319 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with"), DLT_INT(eCode));
324 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
325 for (; sinkIter != mSinks.end(); ++sinkIter)
328 //set the correct domainID
329 sinkIter->domainID = mDomains[0].domainID;
330 if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
332 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with"), DLT_INT(eCode));
337 // std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
338 // for(;gatewayIter!=mGateways.end();++gatewayIter)
340 // am_gatewayID_t gatewayID;
341 // gatewayIter->domainSinkID=mDomains[0].domainID;
342 // gatewayIter->domainSourceID=mDomains[1].domainID;
343 // gatewayIter->controlDomainID=mDomains[0].domainID;
344 // if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
346 // DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with"), DLT_INT(eCode));
348 // gatewayIter->gatewayID=gatewayID;
351 //create thread for interrupts, but only if we are testing - otherwise we get 100% cpu load:
352 //todo: find a solution for the 100% dbus load to uncomment this and make interrupt tests work
353 //pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
356 void AsyncRoutingSender::routingInterfacesRundown()
358 assert(mReceiveInterface!=0);
361 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
363 assert(mReceiveInterface!=0);
364 assert(handle.handle!=0);
366 //first check if we know the handle
367 pthread_mutex_lock(&mMapHandleWorkerMutex);
368 std::map<uint16_t, int16_t>::iterator iter = mMapHandleWorker.begin();
369 if (mMapHandleWorker.find(handle.handle) == mMapHandleWorker.end())
371 pthread_mutex_unlock(&mMapHandleWorkerMutex);
372 return (E_NON_EXISTENT);
374 pthread_mutex_unlock(&mMapHandleWorkerMutex);
376 //ok, cancel the action:
377 if (mPool.cancelWork(iter->second)) return (E_OK);
381 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
383 assert(mReceiveInterface!=0);
384 assert(handle.handle!=0);
385 assert(handle.handleType==H_CONNECT);
386 assert(connectionID!=0);
390 //check if we can take the job
396 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
397 for (; sinkIter != mSinks.end(); ++sinkIter)
399 if (sinkIter->sinkID == sinkID)
405 if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found!
408 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
409 for (; sourceIter != mSources.end(); ++sourceIter)
411 if (sourceIter->sourceID == sourceID)
413 source = *sourceIter;
417 if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
420 if (std::find(source.listConnectionFormats.begin(), source.listConnectionFormats.end(), connectionFormat) == source.listConnectionFormats.end()) return (E_WRONG_FORMAT);
421 if (std::find(sink.listConnectionFormats.begin(), sink.listConnectionFormats.end(), connectionFormat) == sink.listConnectionFormats.end()) return (E_WRONG_FORMAT);
423 //the operation is ok, lets create a worker, assign it to a task in the task pool
424 asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
425 if ((work = mPool.startWork(worker)) == -1)
427 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncConnect not enough threads!"));
429 return (E_NOT_POSSIBLE);
432 //save the handle related to the workerID
433 pthread_mutex_lock(&mMapHandleWorkerMutex);
434 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
435 pthread_mutex_unlock(&mMapHandleWorkerMutex);
440 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
442 assert(mReceiveInterface!=0);
443 assert(handle.handle!=0);
444 assert(handle.handleType==H_DISCONNECT);
445 assert(connectionID!=0);
447 //check if we can take the job
450 pthread_mutex_lock(&mMapConnectionMutex);
451 if (mMapConnectionIDRoute.find(connectionID) == mMapConnectionIDRoute.end())
453 pthread_mutex_unlock(&mMapConnectionMutex);
454 return (E_NON_EXISTENT);
456 pthread_mutex_unlock(&mMapConnectionMutex);
458 //the operation is ok, lets create a worker, assign it to a task in the task pool
459 asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
460 if ((work = mPool.startWork(worker)) == -1)
462 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncDisconnect not enough threads!"));
464 return (E_NOT_POSSIBLE);
467 //save the handle related to the workerID
468 pthread_mutex_lock(&mMapHandleWorkerMutex);
469 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
470 pthread_mutex_unlock(&mMapHandleWorkerMutex);
475 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
477 assert(mReceiveInterface!=0);
478 assert(handle.handle!=0);
479 assert(handle.handleType==H_SETSINKVOLUME);
482 //check if we can take the job
487 pthread_mutex_lock(&mSinksMutex);
488 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
489 for (; sinkIter != mSinks.end(); ++sinkIter)
491 if (sinkIter->sinkID == sinkID)
497 pthread_mutex_unlock(&mSinksMutex);
498 if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found!
500 asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
501 if ((work = mPool.startWork(worker)) == -1)
503 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"));
505 return (E_NOT_POSSIBLE);
508 //save the handle related to the workerID
509 pthread_mutex_lock(&mMapHandleWorkerMutex);
510 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
511 pthread_mutex_unlock(&mMapHandleWorkerMutex);
516 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
518 assert(mReceiveInterface!=0);
519 assert(handle.handle!=0);
520 assert(handle.handleType==H_SETSOURCEVOLUME);
523 //check if we can take the job
528 pthread_mutex_lock(&mSourcesMutex);
529 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
530 for (; sourceIter != mSources.end(); ++sourceIter)
532 if (sourceIter->sourceID == sourceID)
534 source = *sourceIter;
538 pthread_mutex_unlock(&mSourcesMutex);
539 if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
541 asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
542 if ((work = mPool.startWork(worker)) == -1)
544 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"));
546 return (E_NOT_POSSIBLE);
549 //save the handle related to the workerID
550 pthread_mutex_lock(&mMapHandleWorkerMutex);
551 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
552 pthread_mutex_unlock(&mMapHandleWorkerMutex);
557 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
559 assert(mReceiveInterface!=0);
560 assert(handle.handle!=0);
561 assert(handle.handleType==H_SETSOURCESTATE);
564 //check if we can take the job
569 pthread_mutex_lock(&mSourcesMutex);
570 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
571 for (; sourceIter != mSources.end(); ++sourceIter)
573 if (sourceIter->sourceID == sourceID)
575 source = *sourceIter;
579 pthread_mutex_unlock(&mSourcesMutex);
580 if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
582 asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
583 if ((work = mPool.startWork(worker)) == -1)
585 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
587 return (E_NOT_POSSIBLE);
590 //save the handle related to the workerID
591 pthread_mutex_lock(&mMapHandleWorkerMutex);
592 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
593 pthread_mutex_unlock(&mMapHandleWorkerMutex);
598 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_SoundProperty_s& soundProperty, const am_sinkID_t sinkID)
600 assert(mReceiveInterface!=0);
601 assert(handle.handle!=0);
602 assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
605 //check if we can take the job
610 pthread_mutex_lock(&mSinksMutex);
611 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
612 for (; sinkIter != mSinks.end(); ++sinkIter)
614 if (sinkIter->sinkID == sinkID)
620 pthread_mutex_unlock(&mSinksMutex);
621 if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found!
623 asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
624 if ((work = mPool.startWork(worker)) == -1)
626 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"));
628 return (E_NOT_POSSIBLE);
631 //save the handle related to the workerID
632 pthread_mutex_lock(&mMapHandleWorkerMutex);
633 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
634 pthread_mutex_unlock(&mMapHandleWorkerMutex);
639 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
641 //todo: implement crossfader
645 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
647 assert(mReceiveInterface!=0);
650 //check if we can take the job
655 pthread_mutex_lock(&mDomainsMutex);
656 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
657 for (; domainIter != mDomains.end(); ++domainIter)
659 if (domainIter->domainID == domainID)
661 domain = *domainIter;
665 pthread_mutex_unlock(&mDomainsMutex);
666 if (domainIter == mDomains.end()) return (E_NON_EXISTENT); //not found!
668 asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
669 if ((work = mPool.startWork(worker)) == -1)
671 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::setDomainState not enough threads!"));
673 return (E_NOT_POSSIBLE);
680 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_SoundProperty_s & soundProperty, const am_sourceID_t sourceID)
682 assert(mReceiveInterface!=0);
683 assert(handle.handle!=0);
684 assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
687 //check if we can take the job
692 pthread_mutex_lock(&mSourcesMutex);
693 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
694 for (; sourceIter != mSources.end(); ++sourceIter)
696 if (sourceIter->sourceID == sourceID)
698 source = *sourceIter;
702 pthread_mutex_unlock(&mSourcesMutex);
703 if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found!
705 asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
706 if ((work = mPool.startWork(worker)) == -1)
708 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
710 return (E_NOT_POSSIBLE);
713 //save the handle related to the workerID
714 pthread_mutex_lock(&mMapHandleWorkerMutex);
715 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
716 pthread_mutex_unlock(&mMapHandleWorkerMutex);
721 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
723 BusName = "RoutingAsync";
727 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
729 //just write two domains into the table and return it
730 std::vector<am_Domain_s> table;
732 item.busname = "RoutingAsync";
735 item.name = "AsyncDomain1";
736 item.nodename = "AsyncNode1";
737 item.state = DS_CONTROLLED;
738 table.push_back(item);
739 item.busname = "RoutingAsync";
742 item.name = "AsyncDomain2";
743 item.nodename = "AsyncNode2";
744 item.state = DS_CONTROLLED;
745 table.push_back(item);
749 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
751 //create a bunch full of sinks
752 std::vector<am_Sink_s> table;
754 am_SoundProperty_s sp;
757 for (int16_t i = 0; i <= 10; i++)
759 std::stringstream temp;
761 item.domainID = 0; //we cannot know this when the table is created !
762 item.name = "mySink" + temp.str();
763 item.sinkID = i; //take fixed ids to make thins easy
764 item.sinkClassID = 1;
766 item.listSoundProperties.push_back(sp);
768 item.listConnectionFormats.push_back(CF_ANALOG);
769 table.push_back(item);
774 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
776 //create a bunch full of sources
777 std::vector<am_Source_s> table;
779 for (int16_t i = 0; i <= 10; i++)
781 std::stringstream temp;
783 item.domainID = 0; //we cannot know this when the table is created !
784 item.name = "mySource" + temp.str();
785 item.sourceID = i; //take fixed ids to make thins easy
786 item.sourceClassID = 1;
789 item.listConnectionFormats.push_back(CF_ANALOG);
790 table.push_back(item);
795 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
797 pthread_mutex_lock(&mMapConnectionMutex);
798 mMapConnectionIDRoute.insert(std::make_pair(connectionID, route));
799 pthread_mutex_unlock(&mMapConnectionMutex);
802 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
804 pthread_mutex_lock(&mMapHandleWorkerMutex);
805 if (mMapHandleWorker.erase(handle))
807 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeHandle could not remove handle"));
809 pthread_mutex_unlock(&mMapHandleWorkerMutex);
812 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
814 pthread_mutex_lock(&mMapConnectionMutex);
815 if (mMapConnectionIDRoute.erase(connectionID))
817 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeConnectionSafe could not remove connection"));
819 pthread_mutex_unlock(&mMapConnectionMutex);
822 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
824 pthread_mutex_lock(&mSinksMutex);
825 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
826 for (; sinkIter != mSinks.end(); ++sinkIter)
828 if (sinkIter->sinkID == sinkID)
830 sinkIter->volume = volume;
834 pthread_mutex_unlock(&mSinksMutex);
837 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
839 pthread_mutex_lock(&mSourcesMutex);
840 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
841 for (; sourceIter != mSources.end(); ++sourceIter)
843 if (sourceIter->sourceID == sourceID)
845 sourceIter->volume = volume;
849 pthread_mutex_unlock(&mSourcesMutex);
852 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
854 pthread_mutex_lock(&mSourcesMutex);
855 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
856 for (; sourceIter != mSources.end(); ++sourceIter)
858 if (sourceIter->sourceID == sourceID)
860 sourceIter->sourceState = state;
864 pthread_mutex_unlock(&mSourcesMutex);
867 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
869 pthread_mutex_lock(&mSinksMutex);
870 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
871 for (; sinkIter != mSinks.end(); ++sinkIter)
873 if (sinkIter->sinkID == sinkID)
875 std::vector<am_SoundProperty_s>::iterator spIterator = sinkIter->listSoundProperties.begin();
876 for (; spIterator != sinkIter->listSoundProperties.end(); ++spIterator)
878 if (spIterator->type == soundProperty.type)
880 spIterator->value = soundProperty.value;
886 pthread_mutex_unlock(&mSinksMutex);
889 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
891 pthread_mutex_lock(&mSourcesMutex);
892 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
893 for (; sourceIter != mSources.end(); ++sourceIter)
895 if (sourceIter->sourceID == sourceID)
897 std::vector<am_SoundProperty_s>::iterator spIterator = sourceIter->listSoundProperties.begin();
898 for (; spIterator != sourceIter->listSoundProperties.end(); ++spIterator)
900 if (spIterator->type == soundProperty.type)
902 spIterator->value = soundProperty.value;
908 pthread_mutex_unlock(&mSourcesMutex);
911 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
913 pthread_mutex_lock(&mDomainsMutex);
914 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
915 for (; domainIter != mDomains.end(); ++domainIter)
917 if (domainIter->domainID == domainID)
919 domainIter->state = domainState;
923 pthread_mutex_unlock(&mDomainsMutex);
926 uint16_t AsyncRoutingSender::getInterfaceVersion() const
928 return (RoutingSendVersion);
931 am_Error_e am::AsyncRoutingSender::asyncSetSinkSoundProperties(const am_Handle_s handle, const std::vector<am_SoundProperty_s> & listSoundProperties, const am_sinkID_t sinkID)
936 am_Error_e am::AsyncRoutingSender::asyncSetSourceSoundProperties(const am_Handle_s handle, const std::vector<am_SoundProperty_s> & listSoundProperties, const am_sourceID_t sourceID)
941 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
943 std::vector<am_Gateway_s> table;
945 item.name = "myGateway";
948 table.push_back(item);
952 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat) :
953 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(handle), mConnectionID(connectionID), mSourceID(sourceID), mSinkID(sinkID), mConnectionFormat(connectionFormat)
957 void asycConnectWorker::start2work()
959 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start connecting"));
964 //do something for one second
965 if (timedWait(t)) return;
966 am_RoutingElement_s route;
967 route.sinkID = mSinkID;
968 route.sourceID = mSourceID;
969 route.connectionFormat = mConnectionFormat;
971 //enter new connectionID into the list
972 mAsyncSender->insertConnectionSafe(mConnectionID, route);
975 mShadow->ackConnect(mHandle, mConnectionID, E_OK);
978 mAsyncSender->removeHandleSafe(mHandle.handle);
981 void asycConnectWorker::cancelWork()
983 mAsyncSender->removeHandleSafe(mHandle.handle);
984 mShadow->ackConnect(mHandle, mConnectionID, E_ABORTED);
987 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID) :
988 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(handle), mConnectionID(connectionID)
992 void asycDisConnectWorker::start2work()
994 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start disconnecting"));
999 //do something for one second
1000 if (timedWait(t)) return;
1001 am_RoutingElement_s route;
1003 //enter new connectionID into the list
1004 mAsyncSender->insertConnectionSafe(mConnectionID, route);
1007 mShadow->ackDisconnect(mHandle, mConnectionID, E_OK);
1009 //destroy the handle
1010 mAsyncSender->removeHandleSafe(mHandle.handle);
1014 void asycDisConnectWorker::cancelWork()
1016 mAsyncSender->removeHandleSafe(mHandle.handle);
1017 mShadow->ackDisconnect(mHandle, mConnectionID, E_ABORTED);
1020 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1021 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mCurrentVolume(currentVolume), mHandle(handle), mSinkID(sinkID), mVolume(volume), mRamp(ramp), mTime(time)
1025 void asyncSetSinkVolumeWorker::start2work()
1027 //todo: this implementation does not respect time and ramp....
1028 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink volume"));
1030 t.tv_nsec = 10000000;
1033 while (mCurrentVolume != mVolume)
1035 if (mCurrentVolume < mVolume)
1039 mShadow->ackSinkVolumeTick(mHandle, mSinkID, mCurrentVolume);
1040 if (timedWait(t)) return;
1043 //enter new connectionID into the list
1044 mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1047 mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_OK);
1049 //destroy the handle
1050 mAsyncSender->removeHandleSafe(mHandle.handle);
1053 void asyncSetSinkVolumeWorker::cancelWork()
1055 mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1056 mAsyncSender->removeHandleSafe(mHandle.handle);
1057 mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1060 am::asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1061 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mCurrentVolume(currentVolume), mHandle(handle), mSourceID(SourceID), mVolume(volume), mRamp(ramp), mTime(time)
1065 void am::asyncSetSourceVolumeWorker::start2work()
1067 //todo: this implementation does not respect time and ramp....
1068 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source volume"));
1070 t.tv_nsec = 10000000;
1073 while (mCurrentVolume != mVolume)
1075 if (mCurrentVolume < mVolume)
1079 mShadow->ackSourceVolumeTick(mHandle, mSourceID, mCurrentVolume);
1080 if (timedWait(t)) return;
1083 //enter new connectionID into the list
1084 mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1087 mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_OK);
1089 //destroy the handle
1090 mAsyncSender->removeHandleSafe(mHandle.handle);
1093 void am::asyncSetSourceVolumeWorker::cancelWork()
1095 mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1096 mAsyncSender->removeHandleSafe(mHandle.handle);
1097 mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1100 am::asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state) :
1101 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(handle), mSourceID(sourceID), mSourcestate(state)
1105 void am::asyncSetSourceStateWorker::start2work()
1107 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source state"));
1112 //do something for one second
1113 if (timedWait(t)) return;
1115 //enter new connectionID into the list
1116 mAsyncSender->updateSourceStateSafe(mSourceID, mSourcestate);
1119 mShadow->ackSetSourceState(mHandle, E_OK);
1121 //destroy the handle
1122 mAsyncSender->removeHandleSafe(mHandle.handle);
1125 void am::asyncSetSourceStateWorker::cancelWork()
1128 mShadow->ackSetSourceState(mHandle, E_ABORTED);
1130 //destroy the handle
1131 mAsyncSender->removeHandleSafe(mHandle.handle);
1134 am::asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID) :
1135 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(), mSinkID(sinkID), mSoundProperty(soundProperty)
1139 void am::asyncSetSinkSoundPropertyWorker::start2work()
1141 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink sound property"));
1146 //do something for one second
1147 if (timedWait(t)) return;
1149 //enter new connectionID into the list
1150 mAsyncSender->updateSinkSoundPropertySafe(mSinkID, mSoundProperty);
1153 mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1155 //destroy the handle
1156 mAsyncSender->removeHandleSafe(mHandle.handle);
1159 void am::asyncSetSinkSoundPropertyWorker::cancelWork()
1162 mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1164 //destroy the handle
1165 mAsyncSender->removeHandleSafe(mHandle.handle);
1168 am::asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID) :
1169 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mHandle(), mSourceID(sourceID), mSoundProperty(soundProperty)
1173 void am::asyncSetSourceSoundPropertyWorker::start2work()
1175 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1180 //do something for one second
1181 if (timedWait(t)) return;
1183 //enter new connectionID into the list
1184 mAsyncSender->updateSourceSoundPropertySafe(mSourceID, mSoundProperty);
1187 mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1189 //destroy the handle
1190 mAsyncSender->removeHandleSafe(mHandle.handle);
1193 void am::asyncSetSourceSoundPropertyWorker::cancelWork()
1196 mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1198 //destroy the handle
1199 mAsyncSender->removeHandleSafe(mHandle.handle);
1202 am::asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState) :
1203 Worker(pool), mAsyncSender(asyncSender), mShadow(shadow), mDomainID(domainID), mDomainState(domainState)
1207 void am::asyncDomainStateChangeWorker::start2work()
1209 //todo: sendchanged data must be in here !
1210 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1215 //do something for one second
1216 if (timedWait(t)) return;
1218 //enter new connectionID into the list
1219 mAsyncSender->updateDomainstateSafe(mDomainID, mDomainState);
1220 mShadow->hookDomainStateChange(mDomainID, mDomainState);
1221 //send the new status
1225 void am::asyncDomainStateChangeWorker::cancelWork()
1227 //send the new status
1228 mShadow->hookDomainStateChange(mDomainID, mDomainState);