2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "listplusplus.h"
26 using namespace std::placeholders;
31 static int PPSUpdate(void* data)
33 Core::Performance *performance = (Core::Performance*)data;
35 if(performance->propertiesPerSecond > 0 && performance->propertiesPerSecond != lastpps)
37 lastpps = performance->propertiesPerSecond;
38 DebugOut(1)<<"Property updates per second: "<<performance->propertiesPerSecond<<endl;
41 performance->propertiesPerSecond = 0;
43 if(performance->firedPropertiesPerSecond > 0 && performance->firedPropertiesPerSecond != lastfpps)
45 lastfpps = performance->firedPropertiesPerSecond;
46 DebugOut(1)<<"Fired property updates per second: "<<performance->firedPropertiesPerSecond<<endl;
49 performance->firedPropertiesPerSecond = 0;
54 Core::Core(std::map<string, string> config): AbstractRoutingEngine(config), handleCount(0)
56 g_timeout_add(1000,PPSUpdate,&performance);
58 auto simpleCb = [this](amb::Queue<AbstractPropertyType*, amb::PropertyCompare>* q)
62 AbstractPropertyType* value = q->pop();
63 updateProperty(value);
71 if(config.find("highPriorityQueueSize") != config.end())
73 hpqs = boost::lexical_cast<int, std::string>(config["highPriorityQueueSize"]);
76 if(config.find("normalPriorityQueueSize") != config.end())
78 npqs = boost::lexical_cast<int, std::string>(config["normalPriorityQueueSize"]);
81 if(config.find("lowPriorityQueueSize") != config.end())
83 lpqs = boost::lexical_cast<int, std::string>(config["lowPriorityQueueSize"]);
86 watcherPtr = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueue, simpleCb, npqs);
87 watcherPtrLow = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueLow,
89 AbstractPropertyType::Low);
90 watcherPtrHigh = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueHigh,
92 AbstractPropertyType::High);
100 for(auto itr = mSinks.begin(); itr != mSinks.end(); ++itr)
103 itr = mSinks.begin();
108 void Core::registerSource(AbstractSource *source)
110 mSources.insert(source);
113 void Core::updateSupported(PropertyList added, PropertyList removed, AbstractSource* source)
115 if(!source || mSources.find(source) == mSources.end())
118 /// add the newly supported to master list
121 handleAddSupported(added, source);
123 /// removed no longer supported properties from master list.
125 handleRemoveSupported(removed, source);
127 /// tell all sinks about the newly supported properties.
129 PropertyList s = supported();
131 if(!s.size()) return;
133 for(auto sink : mSinks)
135 sink->supportedChanged(s);
139 PropertyList Core::supported()
141 PropertyList supportedProperties;
143 std::transform(mMasterPropertyList.begin(), mMasterPropertyList.end(), std::back_inserter(supportedProperties),
144 [](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& itr)
149 // remove duplicates:
150 std::sort(supportedProperties.begin(), supportedProperties.end());
151 auto itr = std::unique(supportedProperties.begin(), supportedProperties.end());
153 supportedProperties.erase(itr,supportedProperties.end());
155 return supportedProperties;
158 void Core::updateProperty(AbstractPropertyType *value, const string & uuid)
160 if(value->sourceUuid != uuid)
162 value->sourceUuid = uuid;
165 if(value->priority == AbstractPropertyType::Instant)
166 updateProperty(value);
167 else if(value->priority == AbstractPropertyType::High)
169 value->destroyed.push_back([this](AbstractPropertyType* v)
171 updatePropertyQueueHigh.remove(v);
173 updatePropertyQueueHigh.append(value);
175 else if(value->priority == AbstractPropertyType::Normal)
177 value->destroyed.push_back([this](AbstractPropertyType* v)
179 updatePropertyQueue.remove(v);
181 updatePropertyQueue.append(value);
183 else if(value->priority == AbstractPropertyType::Low)
185 value->destroyed.push_back([this](AbstractPropertyType* v)
187 updatePropertyQueueLow.remove(v);
189 updatePropertyQueueLow.append(value);
193 void Core::updateProperty(AbstractPropertyType * value)
195 VehicleProperty::Property & property = value->name;
196 const string & uuid = value->sourceUuid;
198 performance.propertiesPerSecond++;
200 auto filteredSourceSinkMapIt = propertySinkMap.find(property);
201 auto cbMapItr = propertyCbMap.find(property);
203 if(filteredSourceSinkMapIt != propertySinkMap.end())
205 const FilteredSourceSinkMap & filteredSourceSinks = filteredSourceSinkMapIt->second;
207 DebugOut()<<__FUNCTION__<<"() there are "<<filteredSourceSinks.size()<<" sinks connected to property: "<<property<<endl;
209 performance.firedPropertiesPerSecond++;
211 for(auto itr = filteredSourceSinks.begin(); itr != filteredSourceSinks.end(); ++itr)
213 AbstractSink* sink = itr->first;
214 const std::string & sourceUuid = itr->second;
216 bool isFiltered = !sourceUuid.empty();
220 DebugOut()<<"Property ("<<property<<") for sink is filtered for source: "<<sourceUuid<<endl;
223 if( !isFiltered || sourceUuid == uuid)
225 sink->propertyChanged(value);
231 DebugOut()<<__FUNCTION__<<"() there are no sinks connected to property: "<<property<<endl;
234 if(cbMapItr != propertyCbMap.end())
236 FilteredSourceCbMap cbs = (*cbMapItr).second;
240 uint handle = itr.first;
241 const std::string& sourceUuid = itr.second;
243 bool isFiltered = !sourceUuid.empty();
247 DebugOut()<<"Property ("<<property<<") for cb is filtered for source: "<<sourceUuid<<endl;
250 if( !isFiltered || sourceUuid == uuid)
252 if(handleCbMap.count(handle))
254 auto cb = handleCbMap[handle];
261 DebugOut(DebugOut::Warning)<<"Failed to call callback subscribed to property: "<<property<<endl;
269 DebugOut()<<__FUNCTION__<<"() there are no cb connected to property: "<<property<<endl;
274 void Core::registerSink(AbstractSink *self)
279 void Core::unregisterSink(AbstractSink *self)
284 AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
286 AbstractSource* source = sourceForProperty(request.property, request.sourceUuidFilter);
288 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
290 if(!source || ((source->supportedOperations() & AbstractSource::Get) != AbstractSource::Get)) { // not found or doesn't support AbstractSource::Get
291 // Don't wait until timer expire, complete with error here.
292 reply->error = AsyncPropertyReply::InvalidOperation;
293 if(request.completed)
294 request.completed(reply);
297 source->getPropertyAsync(reply);
301 /** right now the owner of the reply becomes the requestor that called this method.
302 * reply will become invalid after the first reply. */
306 void Core::getRangePropertyAsync(AsyncRangePropertyRequest request)
308 AsyncRangePropertyReply * reply = new AsyncRangePropertyReply(request);
310 bool anySupport = false;
311 for(auto src : mSources)
313 if(((src->supportedOperations() & AbstractSource::GetRanged) == AbstractSource::GetRanged))
316 src->getRangePropertyAsync(reply);
322 reply->success = false;
323 reply->error = AsyncPropertyReply::InvalidOperation;
324 reply->completed(reply);
328 AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request)
330 AbstractSource* src = sourceForProperty(request.property, request.sourceUuidFilter);
332 if(src && ((src->supportedOperations() & AbstractSource::Set) == AbstractSource::Set))
333 return src->setProperty(request);
335 DebugOut(DebugOut::Warning)<<"Error: setProperty opration failed. Property may not be supported: "<<request.property<<endl;
339 bool Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractSink* sink)
341 auto sinksIt = propertySinkMap.find(property);
342 if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
344 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
348 DebugOut(1)<<"Subscribing to: "<<property<<endl;
350 bool subscribed(false);
351 auto itr = mMasterPropertyList.begin();
352 while(itr != mMasterPropertyList.end())
354 VehicleProperty::Property prop = itr->second;
355 if(prop == property) {
356 AbstractSource* src = itr->first;
357 src->subscribeToPropertyChanges(property);
358 // Move to next source. It will skip all the remaining properties in this source.
359 itr = mMasterPropertyList.upper_bound(src);
368 propertySinkMap[property].emplace(sink, std::string(""));
373 bool Core::subscribeToProperty(const VehicleProperty::Property &property, const string &sourceUuidFilter, AbstractSink *sink)
375 auto sinksIt = propertySinkMap.find(property);
376 if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
378 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
382 DebugOut(1)<<"Subscribing to: "<<property<<endl;
384 AbstractSource* src = sourceForProperty(property, sourceUuidFilter);
388 propertySinkMap[property].emplace(sink, sourceUuidFilter);
390 src->subscribeToPropertyChanges(property);
395 bool Core::subscribeToProperty(const VehicleProperty::Property &, const string & sourceUuidFilter, Zone::Type zoneFilter, AbstractSink *sink)
398 throw std::runtime_error("Not implemented");
401 uint Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractRoutingEngine::PropertyChangedType cb, std::string pid)
403 DebugOut(1)<<"Subscribing to: "<<property<<endl;
405 auto itr = mMasterPropertyList.begin();
406 while(itr != mMasterPropertyList.end())
408 VehicleProperty::Property prop = itr->second;
409 if(prop == property) {
410 AbstractSource* src = itr->first;
411 src->subscribeToPropertyChanges(property);
412 // Move to next source. It will skip all the remaining properties in this source.
413 itr = mMasterPropertyList.upper_bound(src);
420 handleCbMap[++handleCount] = cb;
421 propertyCbMap[property].emplace(handleCount, std::string(""));
425 bool Core::unsubscribeToProperty(const VehicleProperty::Property & property, AbstractSink* sink)
427 auto sinksIt = propertySinkMap.find(property);
428 if(sinksIt == propertySinkMap.end())
430 DebugOut(1)<<__FUNCTION__<<" property not subscribed to: "<<property<<endl;
434 sinksIt->second.erase(sink);
436 /// Now we check to see if this is the last subscriber
437 if(sinksIt->second.empty())
439 propertySinkMap.erase(sinksIt);
440 auto itr = mMasterPropertyList.begin();
441 while(itr != mMasterPropertyList.end())
443 if(itr->second == property) {
444 AbstractSource* src = itr->first;
445 src->unsubscribeToPropertyChanges(property);
446 // Move to next source. It will skip all the remaining properties in this source.
447 itr = mMasterPropertyList.upper_bound(src);
458 void Core::unsubscribeToProperty(uint handle)
460 handleCbMap.erase(handle);
461 /// TODO: unsubscribe from source
464 PropertyInfo Core::getPropertyInfo(const VehicleProperty::Property &property, const string &sourceUuid)
467 return PropertyInfo::invalid();
469 auto srcs = sourcesForProperty(property);
471 if(!contains(srcs, sourceUuid))
472 return PropertyInfo::invalid();
474 auto theSource = find_if(mSources.begin(), mSources.end(),[&sourceUuid](const std::set<AbstractSource*>::value_type & itr)
476 return (itr)->uuid() == sourceUuid;
479 return (*theSource)->getPropertyInfo(property);
482 std::vector<string> Core::sourcesForProperty(const VehicleProperty::Property & property)
484 std::vector<std::string> list;
486 for(auto src : mSources)
488 if(contains(src->supported(), property))
489 list.push_back(src->uuid());
495 void Core::inspectSupported()
497 for(AbstractSource* src : mSources)
499 updateSupported(src->supported(), PropertyList(), src);
503 void Core::handleAddSupported(const PropertyList& added, AbstractSource* source)
506 throw std::runtime_error("Core::handleAddSupported passed a null source");
508 if(!contains(mSources, source))
510 mSources.insert(source);
513 for(auto property : added)
515 if(!sourceForProperty(property, source->uuid()))
516 mMasterPropertyList.emplace(source, property);
518 // Subscribe to property in a new source if such property was subscribed. This catches newly supported properties in the process.
519 if( propertySinkMap.find(property) != propertySinkMap.end()){
520 source->subscribeToPropertyChanges(property);
525 void Core::handleRemoveSupported(const PropertyList& removed, AbstractSource* source)
530 auto range = mMasterPropertyList.equal_range(source);
531 for(auto itr = removed.begin(); itr != removed.end(); ++itr)
534 // TODO: We do not have info about all subscribed sources in
535 // std::unordered_map<VehicleProperty::Property, std::map<AbstractSink*, std::string> > propertySinkMap
536 // so we do not know if we can/should remove property from propertySinkMap,
537 // but I suppose this should be handled by each AbstractSink implementation in a callback AbstractSink::supportedChanged().
539 const VehicleProperty::Property property(*itr);
542 range.first, // the first property in source
543 range.second, // one item right after the last property in source
544 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
547 if (it != range.second)// property was found
549 mMasterPropertyList.erase(it);// References and iterators to the erased elements are invalidated. Other iterators and references are not invalidated.
551 // TODO: Do we need to unsubscribe here ???
556 AbstractSource* Core::sourceForProperty(const VehicleProperty::Property& property, const std::string& sourceUuidFilter) const
558 auto it = mMasterPropertyList.end();
559 if(sourceUuidFilter.empty()){
560 it = std::find_if(mMasterPropertyList.begin(), mMasterPropertyList.end(),
561 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
565 auto itSource = find_if(mSources.begin(),mSources.end(),[&sourceUuidFilter](const std::set<AbstractSource*>::value_type & it)
567 return (it)->uuid() == sourceUuidFilter;
569 if(itSource != mSources.end()){
570 auto range = mMasterPropertyList.equal_range(*itSource);
572 range.first, // the first property in source
573 range.second, // one item right after the last property in source
574 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it)
576 return it.second == property; }
579 if (temp != range.second)// property was found
584 if(it == mMasterPropertyList.end())