reverted varianttype
[profile/ivi/automotive-message-broker.git] / ambd / core.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "core.h"
21 #include <functional>
22 #include <glib.h>
23 #include "listplusplus.h"
24 #include "debugout.h"
25
26 using namespace std::placeholders;
27
28 int lastpps=0;
29 int lastfpps=0;
30
31 static int PPSUpdate(void* data)
32 {
33         Core::Performance *performance = (Core::Performance*)data;
34
35         if(performance->propertiesPerSecond > 0 && performance->propertiesPerSecond != lastpps)
36         {
37                 lastpps = performance->propertiesPerSecond;
38                 DebugOut(1)<<"Property updates per second: "<<performance->propertiesPerSecond<<endl;
39         }
40
41         performance->propertiesPerSecond = 0;
42
43         if(performance->firedPropertiesPerSecond > 0 && performance->firedPropertiesPerSecond != lastfpps)
44         {
45                 lastfpps = performance->firedPropertiesPerSecond;
46                 DebugOut(1)<<"Fired property updates per second: "<<performance->firedPropertiesPerSecond<<endl;
47         }
48
49         performance->firedPropertiesPerSecond = 0;
50
51         return 1;
52 }
53
54 Core::Core(std::map<string, string> config): AbstractRoutingEngine(config), handleCount(0)
55 {
56         g_timeout_add(1000,PPSUpdate,&performance);
57
58         auto simpleCb = [this](amb::Queue<AbstractPropertyType*, amb::PropertyCompare>* q)
59         {
60                 while(q->count())
61                 {
62                         AbstractPropertyType* value = q->pop();
63                         updateProperty(value);
64                 }
65         };
66
67         int hpqs = 0;
68         int lpqs = 0;
69         int npqs = 0;
70
71         if(config.find("highPriorityQueueSize") != config.end())
72         {
73                 hpqs = boost::lexical_cast<int, std::string>(config["highPriorityQueueSize"]);
74         }
75
76         if(config.find("normalPriorityQueueSize") != config.end())
77         {
78                 npqs = boost::lexical_cast<int, std::string>(config["normalPriorityQueueSize"]);
79         }
80
81         if(config.find("lowPriorityQueueSize") != config.end())
82         {
83                 lpqs = boost::lexical_cast<int, std::string>(config["lowPriorityQueueSize"]);
84         }
85
86         watcherPtr = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueue, simpleCb, npqs);
87         watcherPtrLow = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueLow,
88                                                                                                                                           simpleCb, lpqs,
89                                                                                                                                           AbstractPropertyType::Low);
90         watcherPtrHigh = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueHigh,
91                                                                                                                                            simpleCb, hpqs,
92                                                                                                                                            AbstractPropertyType::High);
93
94 }
95
96 Core::~Core()
97 {
98         delete watcherPtr;
99
100         for(auto itr = mSinks.begin(); itr != mSinks.end(); ++itr)
101         {
102                 delete *itr;
103                 itr = mSinks.begin();
104         }
105         mSinks.clear();
106 }
107
108 void Core::registerSource(AbstractSource *source)
109 {
110         mSources.insert(source);
111 }
112
113 void Core::updateSupported(PropertyList added, PropertyList removed, AbstractSource* source)
114 {
115         if(!source || mSources.find(source) == mSources.end())
116                 return;
117
118         /// add the newly supported to master list
119
120         if(added.size())
121                 handleAddSupported(added, source);
122
123         /// removed no longer supported properties from master list.
124         if(removed.size())
125                 handleRemoveSupported(removed, source);
126
127         /// tell all sinks about the newly supported properties.
128
129         PropertyList s = supported();
130
131         if(!s.size()) return;
132
133         for(auto sink : mSinks)
134         {
135                 sink->supportedChanged(s);
136         }
137 }
138
139 PropertyList Core::supported()
140 {
141         PropertyList supportedProperties;
142
143         std::transform(mMasterPropertyList.begin(), mMasterPropertyList.end(), std::back_inserter(supportedProperties),
144                           [](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& itr)
145         {
146                 return itr.second;
147         });
148
149         // remove duplicates:
150         std::sort(supportedProperties.begin(), supportedProperties.end());
151         auto itr = std::unique(supportedProperties.begin(), supportedProperties.end());
152
153         supportedProperties.erase(itr,supportedProperties.end());
154
155         return supportedProperties;
156 }
157
158 void Core::updateProperty(AbstractPropertyType *value, const string & uuid)
159 {
160         if(value->sourceUuid != uuid)
161         {
162                 value->sourceUuid = uuid;
163         }
164
165         if(value->priority == AbstractPropertyType::Instant)
166                 updateProperty(value);
167         else if(value->priority == AbstractPropertyType::High)
168         {
169                 value->destroyed.push_back([this](AbstractPropertyType* v)
170                 {
171                         updatePropertyQueueHigh.remove(v);
172                 });
173                 updatePropertyQueueHigh.append(value);
174         }
175         else if(value->priority == AbstractPropertyType::Normal)
176         {
177                 value->destroyed.push_back([this](AbstractPropertyType* v)
178                 {
179                         updatePropertyQueue.remove(v);
180                 });
181                 updatePropertyQueue.append(value);
182         }
183         else if(value->priority == AbstractPropertyType::Low)
184         {
185                 value->destroyed.push_back([this](AbstractPropertyType* v)
186                 {
187                         updatePropertyQueueLow.remove(v);
188                 });
189                 updatePropertyQueueLow.append(value);
190         }
191 }
192
193 void Core::updateProperty(AbstractPropertyType * value)
194 {
195         VehicleProperty::Property & property = value->name;
196         const string & uuid = value->sourceUuid;
197
198         performance.propertiesPerSecond++;
199
200         auto filteredSourceSinkMapIt =  propertySinkMap.find(property);
201         auto cbMapItr = propertyCbMap.find(property);
202
203         if(filteredSourceSinkMapIt != propertySinkMap.end())
204         {
205                 const FilteredSourceSinkMap & filteredSourceSinks = filteredSourceSinkMapIt->second;
206
207                 DebugOut()<<__FUNCTION__<<"() there are "<<filteredSourceSinks.size()<<" sinks connected to property: "<<property<<endl;
208
209                 performance.firedPropertiesPerSecond++;
210
211                 for(auto itr = filteredSourceSinks.begin(); itr != filteredSourceSinks.end(); ++itr)
212                 {
213                         AbstractSink* sink = itr->first;
214                         const std::string & sourceUuid = itr->second;
215
216                         bool isFiltered = !sourceUuid.empty();
217
218                         if(isFiltered)
219                         {
220                                 DebugOut()<<"Property ("<<property<<") for sink is filtered for source: "<<sourceUuid<<endl;
221                         }
222
223                         if( !isFiltered || sourceUuid == uuid)
224                         {
225                                 sink->propertyChanged(value);
226                         }
227                 }
228         }
229         else
230         {
231                 DebugOut()<<__FUNCTION__<<"() there are no sinks connected to property: "<<property<<endl;
232         }
233
234         if(cbMapItr != propertyCbMap.end())
235         {
236                 FilteredSourceCbMap cbs = (*cbMapItr).second;
237
238                 for(auto itr : cbs)
239                 {
240                         uint handle = itr.first;
241                         const std::string& sourceUuid = itr.second;
242
243                         bool isFiltered = !sourceUuid.empty();
244
245                         if(isFiltered)
246                         {
247                                 DebugOut()<<"Property ("<<property<<") for cb is filtered for source: "<<sourceUuid<<endl;
248                         }
249
250                         if( !isFiltered || sourceUuid == uuid)
251                         {
252                                 if(handleCbMap.count(handle))
253                                 {
254                                         auto cb = handleCbMap[handle];
255                                         try
256                                         {
257                                                 cb(value);
258                                         }
259                                         catch(...)
260                                         {
261                                                 DebugOut(DebugOut::Warning)<<"Failed to call callback subscribed to property: "<<property<<endl;
262                                         }
263                                 }
264                         }
265                 }
266         }
267         else
268         {
269                 DebugOut()<<__FUNCTION__<<"() there are no cb connected to property: "<<property<<endl;
270                 return;
271         }
272 }
273
274 void Core::registerSink(AbstractSink *self)
275 {
276         mSinks.insert(self);
277 }
278
279 void Core::unregisterSink(AbstractSink *self)
280 {
281         mSinks.erase(self);
282 }
283
284 AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
285 {
286         AbstractSource* source = sourceForProperty(request.property, request.sourceUuidFilter);
287
288         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
289
290         if(!source || ((source->supportedOperations() & AbstractSource::Get) != AbstractSource::Get)) { // not found or doesn't support AbstractSource::Get
291                 // Don't wait until timer expire, complete with error here.
292                 reply->error = AsyncPropertyReply::InvalidOperation;
293                 if(request.completed)
294                         request.completed(reply);
295         }
296         else{
297                 source->getPropertyAsync(reply);
298
299         }
300
301         /** right now the owner of the reply becomes the requestor that called this method.
302    *  reply will become invalid after the first reply. */
303         return reply;
304 }
305
306 void Core::getRangePropertyAsync(AsyncRangePropertyRequest request)
307 {
308         AsyncRangePropertyReply * reply = new AsyncRangePropertyReply(request);
309
310         bool anySupport = false;
311         for(auto src : mSources)
312         {
313                 if(((src->supportedOperations() & AbstractSource::GetRanged) == AbstractSource::GetRanged))
314                 {
315                         anySupport = true;
316                         src->getRangePropertyAsync(reply);
317                 }
318         }
319
320         if(!anySupport)
321         {
322                 reply->success = false;
323                 reply->error = AsyncPropertyReply::InvalidOperation;
324                 reply->completed(reply);
325         }
326 }
327
328 AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request)
329 {
330         AbstractSource* src = sourceForProperty(request.property, request.sourceUuidFilter);
331
332         if(src && ((src->supportedOperations() & AbstractSource::Set) == AbstractSource::Set))
333                 return src->setProperty(request);
334
335         DebugOut(DebugOut::Warning)<<"Error: setProperty opration failed.  Property may not be supported: "<<request.property<<endl;
336         return NULL;
337 }
338
339 bool Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractSink* sink)
340 {
341         auto sinksIt = propertySinkMap.find(property);
342         if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
343         {
344                 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
345                 return false;
346         }
347
348         DebugOut(1)<<"Subscribing to: "<<property<<endl;
349
350         bool subscribed(false);
351         auto itr = mMasterPropertyList.begin();
352         while(itr != mMasterPropertyList.end())
353         {
354                 VehicleProperty::Property prop = itr->second;
355                 if(prop == property) {
356                         AbstractSource* src = itr->first;
357                         src->subscribeToPropertyChanges(property);
358                         // Move to next source. It will skip all the remaining properties in this source.
359                         itr = mMasterPropertyList.upper_bound(src);
360                         subscribed = true;
361                 }
362                 else{
363                         ++itr;
364                 }
365         }
366
367         //if(subscribed)
368         propertySinkMap[property].emplace(sink, std::string(""));
369
370         return subscribed;
371 }
372
373 bool Core::subscribeToProperty(const VehicleProperty::Property &property, const string &sourceUuidFilter, AbstractSink *sink)
374 {
375         auto sinksIt = propertySinkMap.find(property);
376         if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
377         {
378                 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
379                 return false;
380         }
381
382         DebugOut(1)<<"Subscribing to: "<<property<<endl;
383
384         AbstractSource* src = sourceForProperty(property, sourceUuidFilter);
385         if(!src)
386                 return false;
387
388         propertySinkMap[property].emplace(sink, sourceUuidFilter);
389
390         src->subscribeToPropertyChanges(property);
391
392         return true;
393 }
394
395 bool Core::subscribeToProperty(const VehicleProperty::Property &, const string & sourceUuidFilter, Zone::Type zoneFilter, AbstractSink *sink)
396 {
397         /// TODO: implement
398         throw std::runtime_error("Not implemented");
399 }
400
401 uint Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractRoutingEngine::PropertyChangedType cb, std::string pid)
402 {
403         DebugOut(1)<<"Subscribing to: "<<property<<endl;
404
405         auto itr = mMasterPropertyList.begin();
406         while(itr != mMasterPropertyList.end())
407         {
408                 VehicleProperty::Property prop = itr->second;
409                 if(prop == property) {
410                         AbstractSource* src = itr->first;
411                         src->subscribeToPropertyChanges(property);
412                         // Move to next source. It will skip all the remaining properties in this source.
413                         itr = mMasterPropertyList.upper_bound(src);
414                 }
415                 else{
416                         ++itr;
417                 }
418         }
419
420         handleCbMap[++handleCount] = cb;
421         propertyCbMap[property].emplace(handleCount, std::string(""));
422         return handleCount;
423 }
424
425 bool Core::unsubscribeToProperty(const VehicleProperty::Property & property, AbstractSink* sink)
426 {
427         auto sinksIt = propertySinkMap.find(property);
428         if(sinksIt == propertySinkMap.end())
429         {
430                 DebugOut(1)<<__FUNCTION__<<" property not subscribed to: "<<property<<endl;
431                 return false;
432         }
433
434         sinksIt->second.erase(sink);
435
436         /// Now we check to see if this is the last subscriber
437         if(sinksIt->second.empty())
438         {
439                 propertySinkMap.erase(sinksIt);
440                 auto itr = mMasterPropertyList.begin();
441                 while(itr != mMasterPropertyList.end())
442                 {
443                         if(itr->second == property) {
444                                 AbstractSource* src = itr->first;
445                                 src->unsubscribeToPropertyChanges(property);
446                                 // Move to next source. It will skip all the remaining properties in this source.
447                                 itr = mMasterPropertyList.upper_bound(src);
448                         }
449                         else{
450                                 ++itr;
451                         }
452                 }
453         }
454
455         return true;
456 }
457
458 void Core::unsubscribeToProperty(uint handle)
459 {
460         handleCbMap.erase(handle);
461         /// TODO: unsubscribe from source
462 }
463
464 PropertyInfo Core::getPropertyInfo(const VehicleProperty::Property &property, const string &sourceUuid)
465 {
466         if(sourceUuid == "")
467                 return PropertyInfo::invalid();
468
469         auto srcs = sourcesForProperty(property);
470
471         if(!contains(srcs, sourceUuid))
472                 return PropertyInfo::invalid();
473
474         auto theSource = find_if(mSources.begin(), mSources.end(),[&sourceUuid](const std::set<AbstractSource*>::value_type & itr)
475         {
476                 return (itr)->uuid() == sourceUuid;
477         });
478
479         return (*theSource)->getPropertyInfo(property);
480 }
481
482 std::vector<string> Core::sourcesForProperty(const VehicleProperty::Property & property)
483 {
484         std::vector<std::string> list;
485
486         for(auto src : mSources)
487         {
488                 if(contains(src->supported(), property))
489                         list.push_back(src->uuid());
490         }
491
492         return list;
493 }
494
495 void Core::inspectSupported()
496 {
497         for(AbstractSource* src : mSources)
498         {
499                 updateSupported(src->supported(), PropertyList(), src);
500         }
501 }
502
503 void Core::handleAddSupported(const PropertyList& added, AbstractSource* source)
504 {
505         if(!source)
506                 throw std::runtime_error("Core::handleAddSupported passed a null source");
507
508         if(!contains(mSources, source))
509         {
510                 mSources.insert(source);
511         }
512
513         for(auto property : added)
514         {
515                 if(!sourceForProperty(property, source->uuid()))
516                         mMasterPropertyList.emplace(source, property);
517
518                 // Subscribe to property in a new source if such property was subscribed. This catches newly supported properties in the process.
519                 if( propertySinkMap.find(property) != propertySinkMap.end()){
520                         source->subscribeToPropertyChanges(property);
521                 }
522         }
523 }
524
525 void Core::handleRemoveSupported(const PropertyList& removed, AbstractSource* source)
526 {
527         if(!source)
528                 return;
529
530         auto range = mMasterPropertyList.equal_range(source);
531         for(auto itr = removed.begin(); itr != removed.end(); ++itr)
532         {
533                 //
534                 // TODO: We do not have info about all subscribed sources in
535                 // std::unordered_map<VehicleProperty::Property, std::map<AbstractSink*, std::string> > propertySinkMap
536                 // so we do not know if we can/should remove property from propertySinkMap,
537                 // but I suppose this should be handled by each AbstractSink implementation in a callback AbstractSink::supportedChanged().
538
539                 const VehicleProperty::Property property(*itr);
540
541                 auto it = find_if(
542                                         range.first,    // the first property in source
543                                         range.second,   // one item right after the last property in source
544                                         [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
545                 );
546
547                 if (it != range.second)// property was found
548                 {
549                         mMasterPropertyList.erase(it);// References and iterators to the erased elements are invalidated. Other iterators and references are not invalidated.
550
551                         // TODO: Do we need to unsubscribe here ???
552                 }
553         }
554 }
555
556 AbstractSource* Core::sourceForProperty(const VehicleProperty::Property& property, const std::string& sourceUuidFilter) const
557 {
558         auto it = mMasterPropertyList.end();
559         if(sourceUuidFilter.empty()){
560                 it = std::find_if(mMasterPropertyList.begin(), mMasterPropertyList.end(),
561                                                   [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
562                 );
563         }
564         else{
565                 auto itSource = find_if(mSources.begin(),mSources.end(),[&sourceUuidFilter](const std::set<AbstractSource*>::value_type & it)
566                 {
567                         return (it)->uuid() == sourceUuidFilter;
568                 });
569                 if(itSource != mSources.end()){
570                         auto range = mMasterPropertyList.equal_range(*itSource);
571                         auto temp = find_if(
572                                                 range.first,    // the first property in source
573                                                 range.second,   // one item right after the last property in source
574                                                 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it)
575                         {
576                                 return it.second == property; }
577                         );
578
579                         if (temp != range.second)// property was found
580                                 it = temp;
581                 }
582         }
583
584         if(it == mMasterPropertyList.end())
585                 return nullptr;
586         else
587                 return it->first;
588 }
589