2 * Copyright (C) 2011, BMW AG
4 * GeniviAudioMananger DbusPlugin
6 * \file RoutingSender.cpp
8 * \date 20-Oct-2011 3:42:04 PM
9 * \author Christian Mueller (christian.ei.mueller@bmw.de)
12 * GNU Lesser General Public License, version 2.1, with special exception (GENIVI clause)
13 * Copyright (C) 2011, BMW AG Christian Mueller Christian.ei.mueller@bmw.de
15 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License, version 2.1, as published by the Free Software Foundation.
16 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License, version 2.1, for more details.
17 * You should have received a copy of the GNU Lesser General Public License, version 2.1, along with this program; if not, see <http://www.gnu.org/licenses/lgpl-2.1.html>.
18 * Note that the copyright holders assume that the GNU Lesser General Public License, version 2.1, may also be applicable to programs even in cases in which the program is not a library in the technical sense.
19 * Linking AudioManager statically or dynamically with other modules is making a combined work based on AudioManager. You may license such other modules under the GNU Lesser General Public License, version 2.1. If you do not want to license your linked modules under the GNU Lesser General Public License, version 2.1, you may use the program under the following exception.
20 * As a special exception, the copyright holders of AudioManager give you permission to combine AudioManager with software programs or libraries that are released under any license unless such a combination is not permitted by the license of such a software program or library. You may copy and distribute such a system following the terms of the GNU Lesser General Public License, version 2.1, including this special exception, for AudioManager and the licenses of the other code concerned.
21 * Note that people who make modified versions of AudioManager are not obligated to grant this special exception for their modified versions; it is their choice whether to do so. The GNU Lesser General Public License, version 2.1, gives permission to release a modified version without this exception; this exception also makes it possible to release a modified version which carries forward this exception.
25 #include "RoutingSenderAsyn.h"
26 #include "DltContext.h"
35 #include <dbus/dbus.h>
39 DLT_DECLARE_CONTEXT(PluginRoutingAsync)
41 extern "C" RoutingSendInterface* PluginRoutingInterfaceAsyncFactory()
43 return (new AsyncRoutingSender());
46 extern "C" void destroyRoutingPluginInterfaceAsync(RoutingSendInterface* routingSendInterface)
48 delete routingSendInterface;
51 pthread_mutex_t AsyncRoutingSender::mMapConnectionMutex = PTHREAD_MUTEX_INITIALIZER;
52 pthread_mutex_t AsyncRoutingSender::mMapHandleWorkerMutex = PTHREAD_MUTEX_INITIALIZER;
53 pthread_mutex_t AsyncRoutingSender::mSinksMutex= PTHREAD_MUTEX_INITIALIZER;
54 pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
55 pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
56 pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
58 void* AsyncRoutingSender::InterruptEvents(void *data)
60 RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
64 dbus_error_init(&err);
65 conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
66 dbus_uint32_t serial = 0;
69 dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
71 while (dbus_connection_read_write_dispatch(conn, -1))
73 dbus_connection_read_write(conn, 0);
74 msg = dbus_connection_pop_message(conn);
76 if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
78 am_connectionID_t connectionID;
80 dbus_message_iter_init(msg, &args);
81 dbus_message_iter_get_basic(&args,(void*) &connectionID);
82 dbus_message_iter_next(&args);
83 dbus_message_iter_get_basic(&args,(void*) &delay);
84 reply = dbus_message_new_method_return(msg);
85 dbus_message_iter_init_append(reply, &args);
86 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
87 dbus_connection_send(conn, reply, &serial);
88 shadow->hookTimingInformationChanged(connectionID,delay);
89 dbus_message_unref(reply);
91 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
94 am_Availability_s availability;
95 dbus_message_iter_init(msg, &args);
96 dbus_message_iter_get_basic(&args,(void*) &sinkID);
97 reply = dbus_message_new_method_return(msg);
98 dbus_message_iter_init_append(reply, &args);
99 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
100 dbus_connection_send(conn, reply, &serial);
101 shadow->hookSinkAvailablityStatusChange(sinkID,availability);
102 dbus_message_unref(reply);
104 else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
106 am_sourceID_t sourceID;
107 am_Availability_s availability;
108 dbus_message_iter_init(msg, &args);
109 dbus_message_iter_get_basic(&args,(void*) &sourceID);
110 reply = dbus_message_new_method_return(msg);
111 dbus_message_iter_init_append(reply, &args);
112 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
113 dbus_connection_send(conn, reply, &serial);
114 shadow->hookSourceAvailablityStatusChange(sourceID,availability);
115 dbus_message_unref(reply);
117 else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
119 am_sourceID_t sourceID;
121 am_InterruptState_e state=IS_MIN;
122 dbus_message_iter_init(msg, &args);
123 dbus_message_iter_get_basic(&args,(void*) &sourceID);
124 reply = dbus_message_new_method_return(msg);
125 dbus_message_iter_init_append(reply, &args);
126 dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
127 dbus_connection_send(conn, reply, &serial);
128 shadow->hookInterruptStatusChange(sourceID,state);
129 dbus_message_unref(reply);
131 dbus_connection_flush(conn);
136 void *WorkerThreadPool::WorkerThread(void* data)
138 threadInfo_s *myInfo=(threadInfo_s*)data;
141 sem_wait(&myInfo->block);
142 pthread_mutex_lock(&mBlockingMutex);
143 Worker* actWorker=myInfo->worker;
144 pthread_mutex_unlock(&mBlockingMutex);
145 actWorker->setCancelSempaphore(&myInfo->cancel);
146 actWorker->start2work();
147 actWorker->pPool->finishedWork(myInfo->threadID);
152 WorkerThreadPool::WorkerThreadPool(int numThreads):
153 mNumThreads(numThreads)
156 mListWorkers.resize(mNumThreads);
157 for (int i=0;i<mNumThreads;i++)
159 sem_init(&mListWorkers[i].block,NULL,NULL);
160 sem_init(&mListWorkers[i].cancel,NULL,NULL);
161 mListWorkers[i].busy=false;
162 mListWorkers[i].workerID=++workerID;
163 pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
167 int16_t WorkerThreadPool::startWork(Worker *worker)
169 pthread_mutex_lock(&mBlockingMutex);
170 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
171 for(;it!=mListWorkers.end();++it)
177 pthread_mutex_unlock(&mBlockingMutex);
178 sem_post(&it->block);
179 return ((int)it->workerID);
182 pthread_mutex_unlock(&mBlockingMutex);
186 bool WorkerThreadPool::cancelWork(int workerID)
188 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
189 for(;it!=mListWorkers.end();++it)
191 if(it->workerID==workerID && it->busy)
193 sem_post(&it->cancel);
200 void WorkerThreadPool::finishedWork(pthread_t threadID)
202 pthread_mutex_lock(&mBlockingMutex);
203 std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
204 for(;it!=mListWorkers.end();++it)
206 if(it->threadID==threadID)
213 pthread_mutex_unlock(&mBlockingMutex);
216 WorkerThreadPool::~WorkerThreadPool()
218 for (int i=0;i<mNumThreads;i++)
220 pthread_cancel(mListWorkers[i].threadID);
224 Worker::Worker(WorkerThreadPool *pool):
230 void Worker::setCancelSempaphore(sem_t* cancel)
235 bool Worker::timedWait(timespec timer)
238 if(clock_gettime(0, &temp)==-1)
240 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait error on getting time"));
242 temp.tv_nsec+=timer.tv_nsec;
243 temp.tv_sec+=timer.tv_sec;
244 //if(sem_wait(mCancelSem)==-1)
245 if (sem_timedwait(mCancelSem,&temp)==-1)
248 if(errno == ETIMEDOUT)
250 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait timeout waiting error"));
253 else //failure in waiting, nevertheless, we quit the thread...
255 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
259 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("Worker::timedWait semaphore waiting error"));
264 AsyncRoutingSender::AsyncRoutingSender():
266 mReceiveInterface(0), //
267 mDomains(createDomainTable()), //
268 mSinks(createSinkTable()), //
269 mSources ( createSourceTable ( ) ), //
270 mGateways ( createGatewayTable ( ) ) , //
271 mMapHandleWorker ( ), //
272 mMapConnectionIDRoute(),//
275 DLT_REGISTER_CONTEXT(PluginRoutingAsync,"ASY","Async Plugin");
276 DLT_LOG(PluginRoutingAsync,DLT_LOG_INFO, DLT_STRING("AsyncRoutingSender constructed"));
279 AsyncRoutingSender::~AsyncRoutingSender()
283 void AsyncRoutingSender::startupRoutingInterface(RoutingReceiveInterface *routingreceiveinterface)
285 //first, create the Shadow:
286 assert(routingreceiveinterface!=0);
287 mReceiveInterface = routingreceiveinterface;
288 mShadow.setRoutingInterface(routingreceiveinterface);
291 void AsyncRoutingSender::routingInterfacesReady()
293 assert(mReceiveInterface!=0);
295 //first register the domains
296 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
297 for (; domainIter != mDomains.end(); ++domainIter)
299 am_domainID_t domainID;
300 if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
302 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with"), DLT_INT(eCode));
304 domainIter->domainID = domainID;
308 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
309 for (; sourceIter != mSources.end(); ++sourceIter)
311 am_sourceID_t sourceID;
312 //set the correct domainID
313 sourceIter->domainID = mDomains[0].domainID;
314 if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
316 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with"), DLT_INT(eCode));
321 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
322 for (; sinkIter != mSinks.end(); ++sinkIter)
325 //set the correct domainID
326 sinkIter->domainID = mDomains[0].domainID;
327 if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
329 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with"), DLT_INT(eCode));
334 // std::vector<am_Gateway_s>::iterator gatewayIter=mGateways.begin();
335 // for(;gatewayIter!=mGateways.end();++gatewayIter)
337 // am_gatewayID_t gatewayID;
338 // gatewayIter->domainSinkID=mDomains[0].domainID;
339 // gatewayIter->domainSourceID=mDomains[1].domainID;
340 // gatewayIter->controlDomainID=mDomains[0].domainID;
341 // if((eCode=mReceiveInterface->registerGateway(*gatewayIter,gatewayID))!=E_OK)
343 // DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::routingInterfacesReady error on registering gateway, failed with"), DLT_INT(eCode));
345 // gatewayIter->gatewayID=gatewayID;
348 //create thread for interrupts, but only if we are testing - otherwise we get 100% cpu load:
349 //todo: find a solution for the 100% dbus load to uncomment this and make interrupt tests work
350 //pthread_create(&mInterruptThread,NULL,&AsyncRoutingSender::InterruptEvents,&mShadow);
353 void AsyncRoutingSender::routingInterfacesRundown()
355 assert(mReceiveInterface!=0);
358 am_Error_e AsyncRoutingSender::asyncAbort(const am_Handle_s handle)
360 assert(mReceiveInterface!=0);
361 assert(handle.handle!=0);
363 //first check if we know the handle
364 pthread_mutex_lock(&mMapHandleWorkerMutex);
365 std::map<uint16_t, int16_t>::iterator iter = mMapHandleWorker.begin();
366 if (mMapHandleWorker.find(handle.handle) == mMapHandleWorker.end())
368 pthread_mutex_unlock(&mMapHandleWorkerMutex);
369 return (E_NON_EXISTENT);
371 pthread_mutex_unlock(&mMapHandleWorkerMutex);
373 //ok, cancel the action:
374 if (mPool.cancelWork(iter->second))
379 am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat)
381 assert(mReceiveInterface!=0);
382 assert(handle.handle!=0);
383 assert(handle.handleType==H_CONNECT);
384 assert(connectionID!=0);
388 //check if we can take the job
394 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
395 for (; sinkIter != mSinks.end(); ++sinkIter)
397 if (sinkIter->sinkID == sinkID)
403 if (sinkIter == mSinks.end())
404 return (E_NON_EXISTENT); //not found!
407 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
408 for (; sourceIter != mSources.end(); ++sourceIter)
410 if (sourceIter->sourceID == sourceID)
412 source = *sourceIter;
416 if (sourceIter == mSources.end())
417 return (E_NON_EXISTENT); //not found!
420 if (std::find(source.listConnectionFormats.begin(), source.listConnectionFormats.end(), connectionFormat) == source.listConnectionFormats.end())
421 return (E_WRONG_FORMAT);
422 if (std::find(sink.listConnectionFormats.begin(), sink.listConnectionFormats.end(), connectionFormat) == sink.listConnectionFormats.end())
423 return (E_WRONG_FORMAT);
425 //the operation is ok, lets create a worker, assign it to a task in the task pool
426 asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
427 if ((work = mPool.startWork(worker)) == -1)
429 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncConnect not enough threads!"));
431 return (E_NOT_POSSIBLE);
434 //save the handle related to the workerID
435 pthread_mutex_lock(&mMapHandleWorkerMutex);
436 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
437 pthread_mutex_unlock(&mMapHandleWorkerMutex);
442 am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID)
444 assert(mReceiveInterface!=0);
445 assert(handle.handle!=0);
446 assert(handle.handleType==H_DISCONNECT);
447 assert(connectionID!=0);
449 //check if we can take the job
452 pthread_mutex_lock(&mMapConnectionMutex);
453 if (mMapConnectionIDRoute.find(connectionID) == mMapConnectionIDRoute.end())
455 pthread_mutex_unlock(&mMapConnectionMutex);
456 return (E_NON_EXISTENT);
458 pthread_mutex_unlock(&mMapConnectionMutex);
460 //the operation is ok, lets create a worker, assign it to a task in the task pool
461 asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
462 if ((work = mPool.startWork(worker)) == -1)
464 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncDisconnect not enough threads!"));
466 return (E_NOT_POSSIBLE);
469 //save the handle related to the workerID
470 pthread_mutex_lock(&mMapHandleWorkerMutex);
471 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
472 pthread_mutex_unlock(&mMapHandleWorkerMutex);
477 am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
479 assert(mReceiveInterface!=0);
480 assert(handle.handle!=0);
481 assert(handle.handleType==H_SETSINKVOLUME);
484 //check if we can take the job
489 pthread_mutex_lock(&mSinksMutex);
490 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
491 for (; sinkIter != mSinks.end(); ++sinkIter)
493 if (sinkIter->sinkID == sinkID)
499 pthread_mutex_unlock(&mSinksMutex);
500 if (sinkIter == mSinks.end())
501 return (E_NON_EXISTENT); //not found!
503 asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
504 if ((work = mPool.startWork(worker)) == -1)
506 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"));
508 return (E_NOT_POSSIBLE);
511 //save the handle related to the workerID
512 pthread_mutex_lock(&mMapHandleWorkerMutex);
513 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
514 pthread_mutex_unlock(&mMapHandleWorkerMutex);
519 am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time)
521 assert(mReceiveInterface!=0);
522 assert(handle.handle!=0);
523 assert(handle.handleType==H_SETSOURCEVOLUME);
526 //check if we can take the job
531 pthread_mutex_lock(&mSourcesMutex);
532 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
533 for (; sourceIter != mSources.end(); ++sourceIter)
535 if (sourceIter->sourceID == sourceID)
537 source = *sourceIter;
541 pthread_mutex_unlock(&mSourcesMutex);
542 if (sourceIter == mSources.end())
543 return (E_NON_EXISTENT); //not found!
545 asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
546 if ((work = mPool.startWork(worker)) == -1)
548 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"));
550 return (E_NOT_POSSIBLE);
553 //save the handle related to the workerID
554 pthread_mutex_lock(&mMapHandleWorkerMutex);
555 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
556 pthread_mutex_unlock(&mMapHandleWorkerMutex);
561 am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state)
563 assert(mReceiveInterface!=0);
564 assert(handle.handle!=0);
565 assert(handle.handleType==H_SETSOURCESTATE);
568 //check if we can take the job
573 pthread_mutex_lock(&mSourcesMutex);
574 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
575 for (; sourceIter != mSources.end(); ++sourceIter)
577 if (sourceIter->sourceID == sourceID)
579 source = *sourceIter;
583 pthread_mutex_unlock(&mSourcesMutex);
584 if (sourceIter == mSources.end())
585 return (E_NON_EXISTENT); //not found!
587 asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
588 if ((work = mPool.startWork(worker)) == -1)
590 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
592 return (E_NOT_POSSIBLE);
595 //save the handle related to the workerID
596 pthread_mutex_lock(&mMapHandleWorkerMutex);
597 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
598 pthread_mutex_unlock(&mMapHandleWorkerMutex);
603 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handle, const am_sinkID_t sinkID, const am_SoundProperty_s & soundProperty)
605 assert(mReceiveInterface!=0);
606 assert(handle.handle!=0);
607 assert(handle.handleType==H_SETSINKSOUNDPROPERTY);
610 //check if we can take the job
615 pthread_mutex_lock(&mSinksMutex);
616 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
617 for (; sinkIter != mSinks.end(); ++sinkIter)
619 if (sinkIter->sinkID == sinkID)
625 pthread_mutex_unlock(&mSinksMutex);
626 if (sinkIter == mSinks.end())
627 return (E_NON_EXISTENT); //not found!
629 asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
630 if ((work = mPool.startWork(worker)) == -1)
632 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"));
634 return (E_NOT_POSSIBLE);
637 //save the handle related to the workerID
638 pthread_mutex_lock(&mMapHandleWorkerMutex);
639 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
640 pthread_mutex_unlock(&mMapHandleWorkerMutex);
645 am_Error_e AsyncRoutingSender::asyncCrossFade(const am_Handle_s handle, const am_crossfaderID_t crossfaderID, const am_HotSink_e hotSink, const am_RampType_e rampType, const am_time_t time)
647 //todo: implement crossfader
656 am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, const am_DomainState_e domainState)
658 assert(mReceiveInterface!=0);
661 //check if we can take the job
666 pthread_mutex_lock(&mDomainsMutex);
667 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
668 for (; domainIter != mDomains.end(); ++domainIter)
670 if (domainIter->domainID == domainID)
672 domain = *domainIter;
676 pthread_mutex_unlock(&mDomainsMutex);
677 if (domainIter == mDomains.end())
678 return (E_NON_EXISTENT); //not found!
680 asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
681 if ((work = mPool.startWork(worker)) == -1)
683 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::setDomainState not enough threads!"));
685 return (E_NOT_POSSIBLE);
692 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s handle, const am_sourceID_t sourceID, const am_SoundProperty_s & soundProperty)
694 assert(mReceiveInterface!=0);
695 assert(handle.handle!=0);
696 assert(handle.handleType==H_SETSOURCESOUNDPROPERTY);
699 //check if we can take the job
704 pthread_mutex_lock(&mSourcesMutex);
705 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
706 for (; sourceIter != mSources.end(); ++sourceIter)
708 if (sourceIter->sourceID == sourceID)
710 source = *sourceIter;
714 pthread_mutex_unlock(&mSourcesMutex);
715 if (sourceIter == mSources.end())
716 return (E_NON_EXISTENT); //not found!
718 asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
719 if ((work = mPool.startWork(worker)) == -1)
721 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::asyncSetSourceState not enough threads!"));
723 return (E_NOT_POSSIBLE);
726 //save the handle related to the workerID
727 pthread_mutex_lock(&mMapHandleWorkerMutex);
728 mMapHandleWorker.insert(std::make_pair(handle.handle, work));
729 pthread_mutex_unlock(&mMapHandleWorkerMutex);
734 am_Error_e AsyncRoutingSender::returnBusName(std::string & BusName) const
736 BusName = "RoutingAsync";
740 std::vector<am_Domain_s> AsyncRoutingSender::createDomainTable()
742 //just write two domains into the table and return it
743 std::vector<am_Domain_s> table;
745 item.busname = "RoutingAsync";
748 item.name = "AsyncDomain1";
749 item.nodename = "AsyncNode1";
750 item.state = DS_CONTROLLED;
751 table.push_back(item);
752 item.busname = "RoutingAsync";
755 item.name = "AsyncDomain2";
756 item.nodename = "AsyncNode2";
757 item.state = DS_CONTROLLED;
758 table.push_back(item);
762 std::vector<am_Sink_s> AsyncRoutingSender::createSinkTable()
764 //create a bunch full of sinks
765 std::vector<am_Sink_s> table;
767 am_SoundProperty_s sp;
770 for (int16_t i = 0; i <= 10; i++)
772 std::stringstream temp;
774 item.domainID = 0; //we cannot know this when the table is created !
775 item.name = "mySink" + temp.str();
776 item.sinkID = i; //take fixed ids to make thins easy
777 item.sinkClassID = 1;
779 item.listSoundProperties.push_back(sp);
781 item.listConnectionFormats.push_back(CF_ANALOG);
782 table.push_back(item);
787 std::vector<am_Source_s> AsyncRoutingSender::createSourceTable()
789 //create a bunch full of sources
790 std::vector<am_Source_s> table;
792 for (int16_t i = 0; i <= 10; i++)
794 std::stringstream temp;
796 item.domainID = 0; //we cannot know this when the table is created !
797 item.name = "mySource" + temp.str();
798 item.sourceID = i; //take fixed ids to make thins easy
799 item.sourceClassID = 1;
802 item.listConnectionFormats.push_back(CF_ANALOG);
803 table.push_back(item);
808 void AsyncRoutingSender::insertConnectionSafe(am_connectionID_t connectionID, am_RoutingElement_s route)
810 pthread_mutex_lock(&mMapConnectionMutex);
811 mMapConnectionIDRoute.insert(std::make_pair(connectionID, route));
812 pthread_mutex_unlock(&mMapConnectionMutex);
815 void AsyncRoutingSender::removeHandleSafe(uint16_t handle)
817 pthread_mutex_lock(&mMapHandleWorkerMutex);
818 if (mMapHandleWorker.erase(handle))
820 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeHandle could not remove handle"));
822 pthread_mutex_unlock(&mMapHandleWorkerMutex);
825 void AsyncRoutingSender::removeConnectionSafe(am_connectionID_t connectionID)
827 pthread_mutex_lock(&mMapConnectionMutex);
828 if (mMapConnectionIDRoute.erase(connectionID))
830 DLT_LOG(PluginRoutingAsync, DLT_LOG_ERROR, DLT_STRING("AsyncRoutingSender::removeConnectionSafe could not remove connection"));
832 pthread_mutex_unlock(&mMapConnectionMutex);
835 void AsyncRoutingSender::updateSinkVolumeSafe(am_sinkID_t sinkID, am_volume_t volume)
837 pthread_mutex_lock(&mSinksMutex);
838 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
839 for (; sinkIter != mSinks.end(); ++sinkIter)
841 if (sinkIter->sinkID == sinkID)
843 sinkIter->volume = volume;
847 pthread_mutex_unlock(&mSinksMutex);
850 void am::AsyncRoutingSender::updateSourceVolumeSafe(am_sourceID_t sourceID, am_volume_t volume)
852 pthread_mutex_lock(&mSourcesMutex);
853 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
854 for (; sourceIter != mSources.end(); ++sourceIter)
856 if (sourceIter->sourceID == sourceID)
858 sourceIter->volume = volume;
862 pthread_mutex_unlock(&mSourcesMutex);
865 void am::AsyncRoutingSender::updateSourceStateSafe(am_sourceID_t sourceID, am_SourceState_e state)
867 pthread_mutex_lock(&mSourcesMutex);
868 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
869 for (; sourceIter != mSources.end(); ++sourceIter)
871 if (sourceIter->sourceID == sourceID)
873 sourceIter->sourceState = state;
877 pthread_mutex_unlock(&mSourcesMutex);
880 void am::AsyncRoutingSender::updateSinkSoundPropertySafe(am_sinkID_t sinkID, am_SoundProperty_s soundProperty)
882 pthread_mutex_lock(&mSinksMutex);
883 std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
884 for (; sinkIter != mSinks.end(); ++sinkIter)
886 if (sinkIter->sinkID == sinkID)
888 std::vector<am_SoundProperty_s>::iterator spIterator = sinkIter->listSoundProperties.begin();
889 for (; spIterator != sinkIter->listSoundProperties.end(); ++spIterator)
891 if (spIterator->type == soundProperty.type)
893 spIterator->value = soundProperty.value;
899 pthread_mutex_unlock(&mSinksMutex);
902 void am::AsyncRoutingSender::updateSourceSoundPropertySafe(am_sourceID_t sourceID, am_SoundProperty_s soundProperty)
904 pthread_mutex_lock(&mSourcesMutex);
905 std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
906 for (; sourceIter != mSources.end(); ++sourceIter)
908 if (sourceIter->sourceID == sourceID)
910 std::vector<am_SoundProperty_s>::iterator spIterator = sourceIter->listSoundProperties.begin();
911 for (; spIterator != sourceIter->listSoundProperties.end(); ++spIterator)
913 if (spIterator->type == soundProperty.type)
915 spIterator->value = soundProperty.value;
921 pthread_mutex_unlock(&mSourcesMutex);
924 void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState)
926 pthread_mutex_lock(&mDomainsMutex);
927 std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
928 for (; domainIter != mDomains.end(); ++domainIter)
930 if (domainIter->domainID == domainID)
932 domainIter->state = domainState;
936 pthread_mutex_unlock(&mDomainsMutex);
939 uint16_t AsyncRoutingSender::getInterfaceVersion() const
941 return (RoutingSendVersion);
944 am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperties(const am_Handle_s handle, const am_sourceID_t sourceID, const std::vector<am_SoundProperty_s> & listSoundProperties)
949 (void) listSoundProperties;
953 am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperties(const am_Handle_s handle, const am_sinkID_t sinkID, const std::vector<am_SoundProperty_s> & listSoundProperties)
958 (void) listSoundProperties;
962 std::vector<am_Gateway_s> AsyncRoutingSender::createGatewayTable()
964 std::vector<am_Gateway_s> table;
966 item.name = "myGateway";
969 table.push_back(item);
973 asycConnectWorker::asycConnectWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow* shadow, const am_Handle_s handle, const am_connectionID_t connectionID, const am_sourceID_t sourceID, const am_sinkID_t sinkID, const am_ConnectionFormat_e connectionFormat) :
975 mAsyncSender(asyncSender), //
978 mConnectionID(connectionID), //
979 mSourceID(sourceID), //
981 mConnectionFormat(connectionFormat)
985 void asycConnectWorker::start2work()
987 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start connecting"));
992 //do something for one second
995 am_RoutingElement_s route;
996 route.sinkID = mSinkID;
997 route.sourceID = mSourceID;
998 route.connectionFormat = mConnectionFormat;
1000 //enter new connectionID into the list
1001 mAsyncSender->insertConnectionSafe(mConnectionID, route);
1004 mShadow->ackConnect(mHandle, mConnectionID, E_OK);
1006 //destroy the handle
1007 mAsyncSender->removeHandleSafe(mHandle.handle);
1010 void asycConnectWorker::cancelWork()
1012 mAsyncSender->removeHandleSafe(mHandle.handle);
1013 mShadow->ackConnect(mHandle, mConnectionID, E_ABORTED);
1016 asycDisConnectWorker::asycDisConnectWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_connectionID_t connectionID) :
1018 mAsyncSender(asyncSender), //
1021 mConnectionID(connectionID)
1025 void asycDisConnectWorker::start2work()
1027 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start disconnecting"));
1032 //do something for one second
1035 am_RoutingElement_s route;
1037 //enter new connectionID into the list
1038 mAsyncSender->insertConnectionSafe(mConnectionID, route);
1041 mShadow->ackDisconnect(mHandle, mConnectionID, E_OK);
1043 //destroy the handle
1044 mAsyncSender->removeHandleSafe(mHandle.handle);
1048 void asycDisConnectWorker::cancelWork()
1050 mAsyncSender->removeHandleSafe(mHandle.handle);
1051 mShadow->ackDisconnect(mHandle, mConnectionID, E_ABORTED);
1054 asyncSetSinkVolumeWorker::asyncSetSinkVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1056 mAsyncSender(asyncSender), //
1058 mCurrentVolume(currentVolume), //
1067 void asyncSetSinkVolumeWorker::start2work()
1069 //todo: this implementation does not respect time and ramp....
1070 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink volume"));
1072 t.tv_nsec = 10000000;
1075 while (mCurrentVolume != mVolume)
1077 if (mCurrentVolume < mVolume)
1081 mShadow->ackSinkVolumeTick(mHandle, mSinkID, mCurrentVolume);
1086 //enter new connectionID into the list
1087 mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1090 mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_OK);
1092 //destroy the handle
1093 mAsyncSender->removeHandleSafe(mHandle.handle);
1096 void asyncSetSinkVolumeWorker::cancelWork()
1098 mAsyncSender->updateSinkVolumeSafe(mSinkID, mCurrentVolume);
1099 mAsyncSender->removeHandleSafe(mHandle.handle);
1100 mShadow->ackSetSinkVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1103 asyncSetSourceVolumeWorker::asyncSetSourceVolumeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_volume_t currentVolume, const am_Handle_s handle, const am_sourceID_t SourceID, const am_volume_t volume, const am_RampType_e ramp, const am_time_t time) :
1105 mAsyncSender(asyncSender), //
1107 mCurrentVolume(currentVolume), //
1109 mSourceID(SourceID), //
1116 void asyncSetSourceVolumeWorker::start2work()
1118 //todo: this implementation does not respect time and ramp....
1119 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source volume"));
1121 t.tv_nsec = 10000000;
1124 while (mCurrentVolume != mVolume)
1126 if (mCurrentVolume < mVolume)
1130 mShadow->ackSourceVolumeTick(mHandle, mSourceID, mCurrentVolume);
1135 //enter new connectionID into the list
1136 mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1139 mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_OK);
1141 //destroy the handle
1142 mAsyncSender->removeHandleSafe(mHandle.handle);
1145 void asyncSetSourceVolumeWorker::cancelWork()
1147 mAsyncSender->updateSourceVolumeSafe(mSourceID, mCurrentVolume);
1148 mAsyncSender->removeHandleSafe(mHandle.handle);
1149 mShadow->ackSetSourceVolumeChange(mHandle, mCurrentVolume, E_ABORTED);
1152 asyncSetSourceStateWorker::asyncSetSourceStateWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_sourceID_t sourceID, const am_SourceState_e state) :
1154 mAsyncSender(asyncSender), //
1157 mSourceID(sourceID), //
1162 void asyncSetSourceStateWorker::start2work()
1164 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source state"));
1169 //do something for one second
1173 //enter new connectionID into the list
1174 mAsyncSender->updateSourceStateSafe(mSourceID, mSourcestate);
1177 mShadow->ackSetSourceState(mHandle, E_OK);
1179 //destroy the handle
1180 mAsyncSender->removeHandleSafe(mHandle.handle);
1183 void asyncSetSourceStateWorker::cancelWork()
1186 mShadow->ackSetSourceState(mHandle, E_ABORTED);
1188 //destroy the handle
1189 mAsyncSender->removeHandleSafe(mHandle.handle);
1192 asyncSetSinkSoundPropertyWorker::asyncSetSinkSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sinkID_t sinkID) :
1194 mAsyncSender(asyncSender), //
1198 mSoundProperty(soundProperty)
1202 void asyncSetSinkSoundPropertyWorker::start2work()
1204 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting sink sound property"));
1209 //do something for one second
1213 //enter new connectionID into the list
1214 mAsyncSender->updateSinkSoundPropertySafe(mSinkID, mSoundProperty);
1217 mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1219 //destroy the handle
1220 mAsyncSender->removeHandleSafe(mHandle.handle);
1223 void asyncSetSinkSoundPropertyWorker::cancelWork()
1226 mShadow->ackSetSinkSoundProperty(mHandle, E_OK);
1228 //destroy the handle
1229 mAsyncSender->removeHandleSafe(mHandle.handle);
1232 asyncSetSourceSoundPropertyWorker::asyncSetSourceSoundPropertyWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_Handle_s handle, const am_SoundProperty_s soundProperty, const am_sourceID_t sourceID) :
1234 mAsyncSender(asyncSender), //
1237 mSourceID(sourceID), //
1238 mSoundProperty(soundProperty)
1242 void asyncSetSourceSoundPropertyWorker::start2work()
1244 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1249 //do something for one second
1253 //enter new connectionID into the list
1254 mAsyncSender->updateSourceSoundPropertySafe(mSourceID, mSoundProperty);
1257 mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1259 //destroy the handle
1260 mAsyncSender->removeHandleSafe(mHandle.handle);
1263 void asyncSetSourceSoundPropertyWorker::cancelWork()
1266 mShadow->ackSetSourceSoundProperty(mHandle, E_OK);
1268 //destroy the handle
1269 mAsyncSender->removeHandleSafe(mHandle.handle);
1272 asyncDomainStateChangeWorker::asyncDomainStateChangeWorker(AsyncRoutingSender *asyncSender, WorkerThreadPool *pool, RoutingReceiverAsyncShadow *shadow, const am_domainID_t domainID, const am_DomainState_e domainState) :
1274 mAsyncSender(asyncSender), //
1276 mDomainID(domainID), //
1277 mDomainState(domainState)
1281 void asyncDomainStateChangeWorker::start2work()
1283 //todo: sendchanged data must be in here !
1284 DLT_LOG(PluginRoutingAsync, DLT_LOG_INFO, DLT_STRING("Start setting source sound property"));
1289 //do something for one second
1293 //enter new connectionID into the list
1294 mAsyncSender->updateDomainstateSafe(mDomainID, mDomainState);
1295 mShadow->hookDomainStateChange(mDomainID, mDomainState);
1296 //send the new status
1300 void am::asyncDomainStateChangeWorker::cancelWork()
1302 //send the new status
1303 mShadow->hookDomainStateChange(mDomainID, mDomainState);