2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "listplusplus.h"
26 using namespace std::placeholders;
31 static int PPSUpdate(void* data)
33 Core::Performance *performance = (Core::Performance*)data;
35 if(performance->propertiesPerSecond > 0 && performance->propertiesPerSecond != lastpps)
37 lastpps = performance->propertiesPerSecond;
38 DebugOut(1)<<"Property updates per second: "<<performance->propertiesPerSecond<<endl;
41 performance->propertiesPerSecond = 0;
43 if(performance->firedPropertiesPerSecond > 0 && performance->firedPropertiesPerSecond != lastfpps)
45 lastfpps = performance->firedPropertiesPerSecond;
46 DebugOut(1)<<"Fired property updates per second: "<<performance->firedPropertiesPerSecond<<endl;
49 performance->firedPropertiesPerSecond = 0;
54 Core::Core(std::map<string, string> config): AbstractRoutingEngine(config), handleCount(0)
56 g_timeout_add(1000,PPSUpdate,&performance);
58 auto simpleCb = [this](amb::Queue<AbstractPropertyType*, amb::PropertyCompare>* q)
62 AbstractPropertyType* value = q->pop();
63 updateProperty(value);
71 if(config.find("highPriorityQueueSize") != config.end())
73 hpqs = boost::lexical_cast<int, std::string>(config["highPriorityQueueSize"]);
76 if(config.find("normalPriorityQueueSize") != config.end())
78 npqs = boost::lexical_cast<int, std::string>(config["normalPriorityQueueSize"]);
81 if(config.find("lowPriorityQueueSize") != config.end())
83 lpqs = boost::lexical_cast<int, std::string>(config["lowPriorityQueueSize"]);
86 watcherPtr = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueue, simpleCb, npqs);
87 watcherPtrLow = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueLow,
89 AbstractPropertyType::Low);
90 watcherPtrHigh = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueHigh,
92 AbstractPropertyType::High);
100 for(auto itr = mSinks.begin(); itr != mSinks.end(); ++itr)
103 itr = mSinks.begin();
108 void Core::registerSource(AbstractSource *source)
110 mSources.insert(source);
113 void Core::updateSupported(PropertyList added, PropertyList removed, AbstractSource* source)
115 if(!source || mSources.find(source) == mSources.end())
118 /// add the newly supported to master list
121 handleAddSupported(added, source);
123 /// removed no longer supported properties from master list.
125 handleRemoveSupported(removed, source);
127 /// tell all sinks about the newly supported properties.
129 PropertyList s = supported();
131 if(!s.size()) return;
133 for(auto sink : mSinks)
135 sink->supportedChanged(s);
139 PropertyList Core::supported()
141 PropertyList supportedProperties;
143 std::transform(mMasterPropertyList.begin(), mMasterPropertyList.end(), std::back_inserter(supportedProperties),
144 [](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& itr)
149 // remove duplicates:
150 std::sort(supportedProperties.begin(), supportedProperties.end());
151 auto itr = std::unique(supportedProperties.begin(), supportedProperties.end());
153 supportedProperties.erase(itr,supportedProperties.end());
155 return supportedProperties;
158 void Core::updateProperty(AbstractPropertyType *value, const string & uuid)
160 if(value->sourceUuid != uuid)
162 value->sourceUuid = uuid;
165 if(value->priority == AbstractPropertyType::Instant)
166 updateProperty(value);
167 else if(value->priority == AbstractPropertyType::High)
169 updatePropertyQueueHigh.append(value);
171 else if(value->priority == AbstractPropertyType::Normal)
173 updatePropertyQueue.append(value);
175 else if(value->priority == AbstractPropertyType::Low)
177 updatePropertyQueueLow.append(value);
181 void Core::updateProperty(AbstractPropertyType * value)
183 VehicleProperty::Property & property = value->name;
184 const string & uuid = value->sourceUuid;
186 performance.propertiesPerSecond++;
188 auto filteredSourceSinkMapIt = propertySinkMap.find(property);
189 auto cbMapItr = propertyCbMap.find(property);
191 if(filteredSourceSinkMapIt != propertySinkMap.end())
193 const FilteredSourceSinkMap & filteredSourceSinks = filteredSourceSinkMapIt->second;
195 DebugOut()<<__FUNCTION__<<"() there are "<<filteredSourceSinks.size()<<" sinks connected to property: "<<property<<endl;
197 performance.firedPropertiesPerSecond++;
199 for(auto itr = filteredSourceSinks.begin(); itr != filteredSourceSinks.end(); ++itr)
201 AbstractSink* sink = itr->first;
202 const std::string & sourceUuid = itr->second;
204 bool isFiltered = !sourceUuid.empty();
208 DebugOut()<<"Property ("<<property<<") for sink is filtered for source: "<<sourceUuid<<endl;
211 if( !isFiltered || sourceUuid == uuid)
213 sink->propertyChanged(value);
219 DebugOut()<<__FUNCTION__<<"() there are no sinks connected to property: "<<property<<endl;
222 if(cbMapItr != propertyCbMap.end())
224 FilteredSourceCbMap cbs = (*cbMapItr).second;
228 uint handle = itr.first;
229 const std::string& sourceUuid = itr.second;
231 bool isFiltered = !sourceUuid.empty();
235 DebugOut()<<"Property ("<<property<<") for cb is filtered for source: "<<sourceUuid<<endl;
238 if( !isFiltered || sourceUuid == uuid)
240 if(handleCbMap.count(handle))
242 auto cb = handleCbMap[handle];
249 DebugOut(DebugOut::Warning)<<"Failed to call callback subscribed to property: "<<property<<endl;
257 DebugOut()<<__FUNCTION__<<"() there are no cb connected to property: "<<property<<endl;
262 void Core::registerSink(AbstractSink *self)
267 void Core::unregisterSink(AbstractSink *self)
272 AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
274 AbstractSource* source = sourceForProperty(request.property, request.sourceUuidFilter);
276 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
278 if(!source || ((source->supportedOperations() & AbstractSource::Get) != AbstractSource::Get)) { // not found or doesn't support AbstractSource::Get
279 // Don't wait until timer expire, complete with error here.
280 reply->error = AsyncPropertyReply::InvalidOperation;
281 if(request.completed)
282 request.completed(reply);
285 source->getPropertyAsync(reply);
289 /** right now the owner of the reply becomes the requestor that called this method.
290 * reply will become invalid after the first reply. */
294 void Core::getRangePropertyAsync(AsyncRangePropertyRequest request)
296 AsyncRangePropertyReply * reply = new AsyncRangePropertyReply(request);
298 bool anySupport = false;
299 for(auto src : mSources)
301 if(((src->supportedOperations() & AbstractSource::GetRanged) == AbstractSource::GetRanged))
304 src->getRangePropertyAsync(reply);
310 reply->success = false;
311 reply->error = AsyncPropertyReply::InvalidOperation;
312 reply->completed(reply);
316 AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request)
318 AbstractSource* src = sourceForProperty(request.property, request.sourceUuidFilter);
320 if(src && ((src->supportedOperations() & AbstractSource::Set) == AbstractSource::Set))
321 return src->setProperty(request);
323 DebugOut(DebugOut::Warning)<<"Error: setProperty opration failed. Property may not be supported: "<<request.property<<endl;
327 bool Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractSink* sink)
329 auto sinksIt = propertySinkMap.find(property);
330 if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
332 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
336 DebugOut(1)<<"Subscribing to: "<<property<<endl;
338 bool subscribed(false);
339 auto itr = mMasterPropertyList.begin();
340 while(itr != mMasterPropertyList.end())
342 VehicleProperty::Property prop = itr->second;
343 if(prop == property) {
344 AbstractSource* src = itr->first;
345 src->subscribeToPropertyChanges(property);
346 // Move to next source. It will skip all the remaining properties in this source.
347 itr = mMasterPropertyList.upper_bound(src);
356 propertySinkMap[property].emplace(sink, std::string(""));
361 bool Core::subscribeToProperty(const VehicleProperty::Property &property, const string &sourceUuidFilter, AbstractSink *sink)
363 auto sinksIt = propertySinkMap.find(property);
364 if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
366 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
370 DebugOut(1)<<"Subscribing to: "<<property<<endl;
372 AbstractSource* src = sourceForProperty(property, sourceUuidFilter);
376 propertySinkMap[property].emplace(sink, sourceUuidFilter);
378 src->subscribeToPropertyChanges(property);
383 bool Core::subscribeToProperty(const VehicleProperty::Property &, const string & sourceUuidFilter, Zone::Type zoneFilter, AbstractSink *sink)
386 throw std::runtime_error("Not implemented");
389 uint Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractRoutingEngine::PropertyChangedType cb, std::string pid)
391 DebugOut(1)<<"Subscribing to: "<<property<<endl;
393 auto itr = mMasterPropertyList.begin();
394 while(itr != mMasterPropertyList.end())
396 VehicleProperty::Property prop = itr->second;
397 if(prop == property) {
398 AbstractSource* src = itr->first;
399 src->subscribeToPropertyChanges(property);
400 // Move to next source. It will skip all the remaining properties in this source.
401 itr = mMasterPropertyList.upper_bound(src);
408 handleCbMap[++handleCount] = cb;
409 propertyCbMap[property].emplace(handleCount, std::string(""));
413 bool Core::unsubscribeToProperty(const VehicleProperty::Property & property, AbstractSink* sink)
415 auto sinksIt = propertySinkMap.find(property);
416 if(sinksIt == propertySinkMap.end())
418 DebugOut(1)<<__FUNCTION__<<" property not subscribed to: "<<property<<endl;
422 sinksIt->second.erase(sink);
424 /// Now we check to see if this is the last subscriber
425 if(sinksIt->second.empty())
427 propertySinkMap.erase(sinksIt);
428 auto itr = mMasterPropertyList.begin();
429 while(itr != mMasterPropertyList.end())
431 if(itr->second == property) {
432 AbstractSource* src = itr->first;
433 src->unsubscribeToPropertyChanges(property);
434 // Move to next source. It will skip all the remaining properties in this source.
435 itr = mMasterPropertyList.upper_bound(src);
446 void Core::unsubscribeToProperty(uint handle)
448 handleCbMap.erase(handle);
449 /// TODO: unsubscribe from source
452 PropertyInfo Core::getPropertyInfo(const VehicleProperty::Property &property, const string &sourceUuid)
455 return PropertyInfo::invalid();
457 auto srcs = sourcesForProperty(property);
459 if(!contains(srcs, sourceUuid))
460 return PropertyInfo::invalid();
462 auto theSource = find_if(mSources.begin(), mSources.end(),[&sourceUuid](const std::set<AbstractSource*>::value_type & itr)
464 return (itr)->uuid() == sourceUuid;
467 return (*theSource)->getPropertyInfo(property);
470 std::vector<string> Core::sourcesForProperty(const VehicleProperty::Property & property)
472 std::vector<std::string> list;
474 for(auto src : mSources)
476 if(contains(src->supported(), property))
477 list.push_back(src->uuid());
483 void Core::inspectSupported()
485 for(AbstractSource* src : mSources)
487 updateSupported(src->supported(), PropertyList(), src);
491 void Core::handleAddSupported(const PropertyList& added, AbstractSource* source)
494 throw std::runtime_error("Core::handleAddSupported passed a null source");
496 if(!contains(mSources, source))
498 mSources.insert(source);
501 for(auto property : added)
503 if(!sourceForProperty(property, source->uuid()))
504 mMasterPropertyList.emplace(source, property);
506 // Subscribe to property in a new source if such property was subscribed. This catches newly supported properties in the process.
507 if( propertySinkMap.find(property) != propertySinkMap.end()){
508 source->subscribeToPropertyChanges(property);
513 void Core::handleRemoveSupported(const PropertyList& removed, AbstractSource* source)
518 auto range = mMasterPropertyList.equal_range(source);
519 for(auto itr = removed.begin(); itr != removed.end(); ++itr)
522 // TODO: We do not have info about all subscribed sources in
523 // std::unordered_map<VehicleProperty::Property, std::map<AbstractSink*, std::string> > propertySinkMap
524 // so we do not know if we can/should remove property from propertySinkMap,
525 // but I suppose this should be handled by each AbstractSink implementation in a callback AbstractSink::supportedChanged().
527 const VehicleProperty::Property property(*itr);
530 range.first, // the first property in source
531 range.second, // one item right after the last property in source
532 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
535 if (it != range.second)// property was found
537 mMasterPropertyList.erase(it);// References and iterators to the erased elements are invalidated. Other iterators and references are not invalidated.
539 // TODO: Do we need to unsubscribe here ???
544 AbstractSource* Core::sourceForProperty(const VehicleProperty::Property& property, const std::string& sourceUuidFilter) const
546 auto it = mMasterPropertyList.end();
547 if(sourceUuidFilter.empty()){
548 it = std::find_if(mMasterPropertyList.begin(), mMasterPropertyList.end(),
549 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
553 auto itSource = find_if(mSources.begin(),mSources.end(),[&sourceUuidFilter](const std::set<AbstractSource*>::value_type & it)
555 return (it)->uuid() == sourceUuidFilter;
557 if(itSource != mSources.end()){
558 auto range = mMasterPropertyList.equal_range(*itSource);
560 range.first, // the first property in source
561 range.second, // one item right after the last property in source
562 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it)
564 return it.second == property; }
567 if (temp != range.second)// property was found
572 if(it == mMasterPropertyList.end())