979cf77876e712298e369844d8aa0903a6c64019
[profile/ivi/automotive-message-broker.git] / ambd / core.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "core.h"
21 #include <functional>
22 #include <glib.h>
23 #include "listplusplus.h"
24 #include "debugout.h"
25
26 using namespace std::placeholders;
27
28 int lastpps=0;
29 int lastfpps=0;
30
31 static int PPSUpdate(void* data)
32 {
33         Core::Performance *performance = (Core::Performance*)data;
34
35         if(performance->propertiesPerSecond > 0 && performance->propertiesPerSecond != lastpps)
36         {
37                 lastpps = performance->propertiesPerSecond;
38                 DebugOut(1)<<"Property updates per second: "<<performance->propertiesPerSecond<<endl;
39         }
40
41         performance->propertiesPerSecond = 0;
42
43         if(performance->firedPropertiesPerSecond > 0 && performance->firedPropertiesPerSecond != lastfpps)
44         {
45                 lastfpps = performance->firedPropertiesPerSecond;
46                 DebugOut(1)<<"Fired property updates per second: "<<performance->firedPropertiesPerSecond<<endl;
47         }
48
49         performance->firedPropertiesPerSecond = 0;
50
51         return 1;
52 }
53
54 Core::Core(std::map<string, string> config): AbstractRoutingEngine(config), handleCount(0)
55 {
56         g_timeout_add(1000,PPSUpdate,&performance);
57
58         auto simpleCb = [this](amb::Queue<AbstractPropertyType*, amb::PropertyCompare>* q)
59         {
60                 while(q->count())
61                 {
62                         AbstractPropertyType* value = q->pop();
63                         updateProperty(value);
64                 }
65         };
66
67         int hpqs = 0;
68         int lpqs = 0;
69         int npqs = 0;
70
71         if(config.find("highPriorityQueueSize") != config.end())
72         {
73                 hpqs = boost::lexical_cast<int, std::string>(config["highPriorityQueueSize"]);
74         }
75
76         if(config.find("normalPriorityQueueSize") != config.end())
77         {
78                 npqs = boost::lexical_cast<int, std::string>(config["normalPriorityQueueSize"]);
79         }
80
81         if(config.find("lowPriorityQueueSize") != config.end())
82         {
83                 lpqs = boost::lexical_cast<int, std::string>(config["lowPriorityQueueSize"]);
84         }
85
86         watcherPtr = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueue, simpleCb, npqs);
87         watcherPtrLow = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueLow,
88                                                                                                                                           simpleCb, lpqs,
89                                                                                                                                           AbstractPropertyType::Low);
90         watcherPtrHigh = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueHigh,
91                                                                                                                                            simpleCb, hpqs,
92                                                                                                                                            AbstractPropertyType::High);
93
94 }
95
96 Core::~Core()
97 {
98         delete watcherPtr;
99
100         for(auto itr = mSinks.begin(); itr != mSinks.end(); ++itr)
101         {
102                 delete *itr;
103                 itr = mSinks.begin();
104         }
105         mSinks.clear();
106 }
107
108 void Core::registerSource(AbstractSource *source)
109 {
110         mSources.insert(source);
111 }
112
113 void Core::updateSupported(PropertyList added, PropertyList removed, AbstractSource* source)
114 {
115         if(!source || mSources.find(source) == mSources.end())
116                 return;
117
118         /// add the newly supported to master list
119
120         if(added.size())
121                 handleAddSupported(added, source);
122
123         /// removed no longer supported properties from master list.
124         if(removed.size())
125                 handleRemoveSupported(removed, source);
126
127         /// tell all sinks about the newly supported properties.
128
129         PropertyList s = supported();
130
131         if(!s.size()) return;
132
133         for(auto sink : mSinks)
134         {
135                 sink->supportedChanged(s);
136         }
137 }
138
139 PropertyList Core::supported()
140 {
141         PropertyList supportedProperties;
142
143         std::transform(mMasterPropertyList.begin(), mMasterPropertyList.end(), std::back_inserter(supportedProperties),
144                           [](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& itr)
145         {
146                 return itr.second;
147         });
148
149         // remove duplicates:
150         std::sort(supportedProperties.begin(), supportedProperties.end());
151         auto itr = std::unique(supportedProperties.begin(), supportedProperties.end());
152
153         supportedProperties.erase(itr,supportedProperties.end());
154
155         return supportedProperties;
156 }
157
158 void Core::updateProperty(AbstractPropertyType *value, const string & uuid)
159 {
160         if(value->sourceUuid != uuid)
161         {
162                 value->sourceUuid = uuid;
163         }
164
165         if(value->priority == AbstractPropertyType::Instant)
166                 updateProperty(value);
167         else if(value->priority == AbstractPropertyType::High)
168         {
169                 value->destroyed.push_back([this](AbstractPropertyType* v)
170                 {
171                         updatePropertyQueueHigh.remove(v);
172                 });
173                 updatePropertyQueueHigh.append(value);
174         }
175         else if(value->priority == AbstractPropertyType::Normal)
176         {
177                 value->destroyed.push_back([this](AbstractPropertyType* v)
178                 {
179                         updatePropertyQueue.remove(v);
180                 });
181                 updatePropertyQueue.append(value);
182         }
183         else if(value->priority == AbstractPropertyType::Low)
184         {
185                 value->destroyed.push_back([this](AbstractPropertyType* v)
186                 {
187                         updatePropertyQueueLow.remove(v);
188                 });
189                 updatePropertyQueueLow.append(value);
190         }
191 }
192
193 void Core::updateProperty(AbstractPropertyType * value)
194 {
195         VehicleProperty::Property & property = value->name;
196         const string & uuid = value->sourceUuid;
197
198         performance.propertiesPerSecond++;
199
200         //auto sinks = propertySinkMap[property]; !!! this will insert empty std::set<AbstractSink*> into propertySinkMap !!!
201         auto filteredSourceSinkMapIt =  propertySinkMap.find(property);
202         auto cbMapItr = propertyCbMap.find(property);
203
204         if(filteredSourceSinkMapIt != propertySinkMap.end())
205         {
206                 const FilteredSourceSinkMap & filteredSourceSinks = filteredSourceSinkMapIt->second;
207
208                 DebugOut()<<__FUNCTION__<<"() there are "<<filteredSourceSinks.size()<<" sinks connected to property: "<<property<<endl;
209
210                 performance.firedPropertiesPerSecond++;
211
212                 for(auto itr = filteredSourceSinks.begin(); itr != filteredSourceSinks.end(); ++itr)
213                 {
214                         AbstractSink* sink = itr->first;
215                         const std::string & sourceUuid = itr->second;
216
217                         bool isFiltered = !sourceUuid.empty();
218
219                         if(isFiltered)
220                         {
221                                 DebugOut()<<"Property ("<<property<<") for sink is filtered for source: "<<sourceUuid<<endl;
222                         }
223
224                         if( !isFiltered || sourceUuid == uuid)
225                         {
226                                 sink->propertyChanged(value);
227                         }
228                 }
229         }
230         else
231         {
232                 DebugOut()<<__FUNCTION__<<"() there are no sinks connected to property: "<<property<<endl;
233         }
234
235         if(cbMapItr != propertyCbMap.end())
236         {
237                 FilteredSourceCbMap cbs = (*cbMapItr).second;
238
239                 for(auto itr : cbs)
240                 {
241                         uint handle = itr.first;
242                         const std::string& sourceUuid = itr.second;
243
244                         bool isFiltered = !sourceUuid.empty();
245
246                         if(isFiltered)
247                         {
248                                 DebugOut()<<"Property ("<<property<<") for cb is filtered for source: "<<sourceUuid<<endl;
249                         }
250
251                         if( !isFiltered || sourceUuid == uuid)
252                         {
253                                 if(handleCbMap.count(handle))
254                                 {
255                                         auto cb = handleCbMap[handle];
256                                         try
257                                         {
258                                                 cb(value);
259                                         }
260                                         catch(...)
261                                         {
262                                                 DebugOut(DebugOut::Warning)<<"Failed to call callback subscribed to property: "<<property<<endl;
263                                         }
264                                 }
265                         }
266                 }
267         }
268         else
269         {
270                 DebugOut()<<__FUNCTION__<<"() there are no cb connected to property: "<<property<<endl;
271                 return;
272         }
273 }
274
275 void Core::registerSink(AbstractSink *self)
276 {
277         mSinks.insert(self);
278 }
279
280 void Core::unregisterSink(AbstractSink *self)
281 {
282         mSinks.erase(self);
283 }
284
285 AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
286 {
287         AbstractSource* source = sourceForProperty(request.property, request.sourceUuidFilter);
288
289         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
290
291         if(!source || ((source->supportedOperations() & AbstractSource::Get) != AbstractSource::Get)) { // not found or doesn't support AbstractSource::Get
292                 // Don't wait until timer expire, complete with error here.
293                 reply->error = AsyncPropertyReply::InvalidOperation;
294                 if(request.completed)
295                         request.completed(reply);
296         }
297         else{
298                 source->getPropertyAsync(reply);
299
300         }
301
302         /** right now the owner of the reply becomes the requestor that called this method.
303    *  reply will become invalid after the first reply. */
304         return reply;
305 }
306
307 void Core::getRangePropertyAsync(AsyncRangePropertyRequest request)
308 {
309         AsyncRangePropertyReply * reply = new AsyncRangePropertyReply(request);
310
311         bool anySupport = false;
312         for(auto src : mSources)
313         {
314                 if(((src->supportedOperations() & AbstractSource::GetRanged) == AbstractSource::GetRanged))
315                 {
316                         anySupport = true;
317                         src->getRangePropertyAsync(reply);
318                 }
319         }
320
321         if(!anySupport)
322         {
323                 reply->success = false;
324                 reply->error = AsyncPropertyReply::InvalidOperation;
325                 reply->completed(reply);
326         }
327 }
328
329 AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request)
330 {
331         AbstractSource* src = sourceForProperty(request.property, request.sourceUuidFilter);
332
333         if(src && ((src->supportedOperations() & AbstractSource::Set) == AbstractSource::Set))
334                 return src->setProperty(request);
335
336         DebugOut(DebugOut::Warning)<<"Error: setProperty opration failed.  Property may not be supported: "<<request.property<<endl;
337         return NULL;
338 }
339
340 bool Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractSink* sink)
341 {
342         auto sinksIt = propertySinkMap.find(property);
343         if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
344         {
345                 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
346                 return false;
347         }
348
349         DebugOut(1)<<"Subscribing to: "<<property<<endl;
350
351         bool subscribed(false);
352         auto itr = mMasterPropertyList.begin();
353         while(itr != mMasterPropertyList.end())
354         {
355                 VehicleProperty::Property prop = itr->second;
356                 if(prop == property) {
357                         AbstractSource* src = itr->first;
358                         src->subscribeToPropertyChanges(property);
359                         // Move to next source. It will skip all the remaining properties in this source.
360                         itr = mMasterPropertyList.upper_bound(src);
361                         subscribed = true;
362                 }
363                 else{
364                         ++itr;
365                 }
366         }
367
368         //if(subscribed)
369         propertySinkMap[property].emplace(sink, std::string(""));
370
371         return subscribed;
372 }
373
374 bool Core::subscribeToProperty(const VehicleProperty::Property &property, const string &sourceUuidFilter, AbstractSink *sink)
375 {
376         auto sinksIt = propertySinkMap.find(property);
377         if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
378         {
379                 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
380                 return false;
381         }
382
383         DebugOut(1)<<"Subscribing to: "<<property<<endl;
384
385         AbstractSource* src = sourceForProperty(property, sourceUuidFilter);
386         if(!src)
387                 return false;
388
389         propertySinkMap[property].emplace(sink, sourceUuidFilter);
390
391         src->subscribeToPropertyChanges(property);
392
393         return true;
394 }
395
396 bool Core::subscribeToProperty(const VehicleProperty::Property &, const string & sourceUuidFilter, Zone::Type zoneFilter, AbstractSink *sink)
397 {
398         /// TODO: implement
399         throw std::runtime_error("Not implemented");
400 }
401
402 uint Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractRoutingEngine::PropertyChangedType cb, std::string pid)
403 {
404         DebugOut(1)<<"Subscribing to: "<<property<<endl;
405
406         auto itr = mMasterPropertyList.begin();
407         while(itr != mMasterPropertyList.end())
408         {
409                 VehicleProperty::Property prop = itr->second;
410                 if(prop == property) {
411                         AbstractSource* src = itr->first;
412                         src->subscribeToPropertyChanges(property);
413                         // Move to next source. It will skip all the remaining properties in this source.
414                         itr = mMasterPropertyList.upper_bound(src);
415                 }
416                 else{
417                         ++itr;
418                 }
419         }
420
421         handleCbMap[++handleCount] = cb;
422         propertyCbMap[property].emplace(handleCount, std::string(""));
423         return handleCount;
424 }
425
426 bool Core::unsubscribeToProperty(const VehicleProperty::Property & property, AbstractSink* sink)
427 {
428         auto sinksIt = propertySinkMap.find(property);
429         if(sinksIt == propertySinkMap.end())
430         {
431                 DebugOut(1)<<__FUNCTION__<<" property not subscribed to: "<<property<<endl;
432                 return false;
433         }
434
435         sinksIt->second.erase(sink);
436
437         /// Now we check to see if this is the last subscriber
438         if(sinksIt->second.empty())
439         {
440                 propertySinkMap.erase(sinksIt);
441                 auto itr = mMasterPropertyList.begin();
442                 while(itr != mMasterPropertyList.end())
443                 {
444                         if(itr->second == property) {
445                                 AbstractSource* src = itr->first;
446                                 src->unsubscribeToPropertyChanges(property);
447                                 // Move to next source. It will skip all the remaining properties in this source.
448                                 itr = mMasterPropertyList.upper_bound(src);
449                         }
450                         else{
451                                 ++itr;
452                         }
453                 }
454         }
455
456         return true;
457 }
458
459 void Core::unsubscribeToProperty(uint handle)
460 {
461         handleCbMap.erase(handle);
462         /// TODO: unsubscribe from source
463 }
464
465 PropertyInfo Core::getPropertyInfo(const VehicleProperty::Property &property, const string &sourceUuid)
466 {
467         if(sourceUuid == "")
468                 return PropertyInfo::invalid();
469
470         auto srcs = sourcesForProperty(property);
471
472         if(!contains(srcs, sourceUuid))
473                 return PropertyInfo::invalid();
474
475         auto theSource = find_if(mSources.begin(), mSources.end(),[&sourceUuid](const std::set<AbstractSource*>::value_type & itr)
476         {
477                 return (itr)->uuid() == sourceUuid;
478         });
479
480         return (*theSource)->getPropertyInfo(property);
481 }
482
483 std::vector<string> Core::sourcesForProperty(const VehicleProperty::Property & property)
484 {
485         std::vector<std::string> list;
486
487         for(auto src : mSources)
488         {
489                 if(contains(src->supported(), property))
490                         list.push_back(src->uuid());
491         }
492
493         return list;
494 }
495
496 void Core::inspectSupported()
497 {
498         for(AbstractSource* src : mSources)
499         {
500                 updateSupported(src->supported(), PropertyList(), src);
501         }
502 }
503
504 void Core::handleAddSupported(const PropertyList& added, AbstractSource* source)
505 {
506         if(!source)
507                 throw std::runtime_error("Core::handleAddSupported passed a null source");
508
509         if(!contains(mSources, source))
510         {
511                 mSources.insert(source);
512         }
513
514         for(auto property : added)
515         {
516                 if(!sourceForProperty(property, source->uuid()))
517                         mMasterPropertyList.emplace(source, property);
518
519                 // Subscribe to property in a new source if such property was subscribed. This catches newly supported properties in the process.
520                 if( propertySinkMap.find(property) != propertySinkMap.end()){
521                         source->subscribeToPropertyChanges(property);
522                 }
523         }
524 }
525
526 void Core::handleRemoveSupported(const PropertyList& removed, AbstractSource* source)
527 {
528         if(!source)
529                 return;
530
531         auto range = mMasterPropertyList.equal_range(source);
532         for(auto itr = removed.begin(); itr != removed.end(); ++itr)
533         {
534                 //
535                 // TODO: We do not have info about all subscribed sources in
536                 // std::unordered_map<VehicleProperty::Property, std::map<AbstractSink*, std::string> > propertySinkMap
537                 // so we do not know if we can/should remove property from propertySinkMap,
538                 // but I suppose this should be handled by each AbstractSink implementation in a callback AbstractSink::supportedChanged().
539
540                 const VehicleProperty::Property property(*itr);
541
542                 auto it = find_if(
543                                         range.first,    // the first property in source
544                                         range.second,   // one item right after the last property in source
545                                         [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
546                 );
547
548                 if (it != range.second)// property was found
549                 {
550                         mMasterPropertyList.erase(it);// References and iterators to the erased elements are invalidated. Other iterators and references are not invalidated.
551
552                         // TODO: Do we need to unsubscribe here ???
553                 }
554         }
555 }
556
557 AbstractSource* Core::sourceForProperty(const VehicleProperty::Property& property, const std::string& sourceUuidFilter) const
558 {
559         auto it = mMasterPropertyList.end();
560         if(sourceUuidFilter.empty()){
561                 it = std::find_if(mMasterPropertyList.begin(), mMasterPropertyList.end(),
562                                                   [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
563                 );
564         }
565         else{
566                 auto itSource = find_if(mSources.begin(),mSources.end(),[&sourceUuidFilter](const std::set<AbstractSource*>::value_type & it)
567                 {
568                         return (it)->uuid() == sourceUuidFilter;
569                 });
570                 if(itSource != mSources.end()){
571                         auto range = mMasterPropertyList.equal_range(*itSource);
572                         auto temp = find_if(
573                                                 range.first,    // the first property in source
574                                                 range.second,   // one item right after the last property in source
575                                                 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it)
576                         {
577                                 return it.second == property; }
578                         );
579
580                         if (temp != range.second)// property was found
581                                 it = temp;
582                 }
583         }
584
585         if(it == mMasterPropertyList.end())
586                 return nullptr;
587         else
588                 return it->first;
589 }
590