Bugfix: parsing of plugins.d failed with some file-systems
[profile/ivi/automotive-message-broker.git] / ambd / core.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "core.h"
21 #include <functional>
22 #include <glib.h>
23 #include "listplusplus.h"
24 #include "debugout.h"
25
26 using namespace std::placeholders;
27
28 int lastpps=0;
29 int lastfpps=0;
30
31 static int PPSUpdate(void* data)
32 {
33         Core::Performance *performance = (Core::Performance*)data;
34
35         if(performance->propertiesPerSecond > 0 && performance->propertiesPerSecond != lastpps)
36         {
37                 lastpps = performance->propertiesPerSecond;
38                 DebugOut(1)<<"Property updates per second: "<<performance->propertiesPerSecond<<endl;
39         }
40
41         performance->propertiesPerSecond = 0;
42
43         if(performance->firedPropertiesPerSecond > 0 && performance->firedPropertiesPerSecond != lastfpps)
44         {
45                 lastfpps = performance->firedPropertiesPerSecond;
46                 DebugOut(1)<<"Fired property updates per second: "<<performance->firedPropertiesPerSecond<<endl;
47         }
48
49         performance->firedPropertiesPerSecond = 0;
50
51         return 1;
52 }
53
54 Core::Core(std::map<string, string> config): AbstractRoutingEngine(config), handleCount(0)
55 {
56         g_timeout_add(1000,PPSUpdate,&performance);
57
58         auto simpleCb = [this](amb::Queue<AbstractPropertyType*, amb::PropertyCompare>* q)
59         {
60                 while(q->count())
61                 {
62                         AbstractPropertyType* value = q->pop();
63                         updateProperty(value);
64                 }
65         };
66
67         int hpqs = 0;
68         int lpqs = 0;
69         int npqs = 0;
70
71         if(config.find("highPriorityQueueSize") != config.end())
72         {
73                 hpqs = boost::lexical_cast<int, std::string>(config["highPriorityQueueSize"]);
74         }
75
76         if(config.find("normalPriorityQueueSize") != config.end())
77         {
78                 npqs = boost::lexical_cast<int, std::string>(config["normalPriorityQueueSize"]);
79         }
80
81         if(config.find("lowPriorityQueueSize") != config.end())
82         {
83                 lpqs = boost::lexical_cast<int, std::string>(config["lowPriorityQueueSize"]);
84         }
85
86         watcherPtr = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueue, simpleCb, npqs);
87         watcherPtrLow = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueLow,
88                                                                                                                                           simpleCb, lpqs,
89                                                                                                                                           AbstractPropertyType::Low);
90         watcherPtrHigh = new amb::AsyncQueueWatcher<AbstractPropertyType*, amb::PropertyCompare>(&updatePropertyQueueHigh,
91                                                                                                                                            simpleCb, hpqs,
92                                                                                                                                            AbstractPropertyType::High);
93
94 }
95
96 Core::~Core()
97 {
98         delete watcherPtr;
99
100         for(auto itr = mSinks.begin(); itr != mSinks.end(); ++itr)
101         {
102                 delete *itr;
103                 itr = mSinks.begin();
104         }
105         mSinks.clear();
106 }
107
108 void Core::registerSource(AbstractSource *source)
109 {
110         mSources.insert(source);
111 }
112
113 void Core::updateSupported(PropertyList added, PropertyList removed, AbstractSource* source)
114 {
115         if(!source || mSources.find(source) == mSources.end())
116                 return;
117
118         /// add the newly supported to master list
119
120         if(added.size())
121                 handleAddSupported(added, source);
122
123         /// removed no longer supported properties from master list.
124         if(removed.size())
125                 handleRemoveSupported(removed, source);
126
127         /// tell all sinks about the newly supported properties.
128
129         PropertyList s = supported();
130
131         if(!s.size()) return;
132
133         for(auto sink : mSinks)
134         {
135                 sink->supportedChanged(s);
136         }
137 }
138
139 PropertyList Core::supported()
140 {
141         PropertyList supportedProperties;
142
143         std::transform(mMasterPropertyList.begin(), mMasterPropertyList.end(), std::back_inserter(supportedProperties),
144                           [](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& itr)
145         {
146                 return itr.second;
147         });
148
149         // remove duplicates:
150         std::sort(supportedProperties.begin(), supportedProperties.end());
151         auto itr = std::unique(supportedProperties.begin(), supportedProperties.end());
152
153         supportedProperties.erase(itr,supportedProperties.end());
154
155         return supportedProperties;
156 }
157
158 void Core::updateProperty(AbstractPropertyType *value, const string & uuid)
159 {
160         if(value->sourceUuid != uuid)
161         {
162                 value->sourceUuid = uuid;
163         }
164
165         if(value->priority == AbstractPropertyType::Instant)
166                 updateProperty(value);
167         else if(value->priority == AbstractPropertyType::High)
168         {
169                 updatePropertyQueueHigh.append(value);
170         }
171         else if(value->priority == AbstractPropertyType::Normal)
172         {
173                 updatePropertyQueue.append(value);
174         }
175         else if(value->priority == AbstractPropertyType::Low)
176         {
177                 updatePropertyQueueLow.append(value);
178         }
179 }
180
181 void Core::updateProperty(AbstractPropertyType * value)
182 {
183         VehicleProperty::Property & property = value->name;
184         const string & uuid = value->sourceUuid;
185
186         performance.propertiesPerSecond++;
187
188         auto filteredSourceSinkMapIt =  propertySinkMap.find(property);
189         auto cbMapItr = propertyCbMap.find(property);
190
191         if(filteredSourceSinkMapIt != propertySinkMap.end())
192         {
193                 const FilteredSourceSinkMap & filteredSourceSinks = filteredSourceSinkMapIt->second;
194
195                 DebugOut()<<__FUNCTION__<<"() there are "<<filteredSourceSinks.size()<<" sinks connected to property: "<<property<<endl;
196
197                 performance.firedPropertiesPerSecond++;
198
199                 for(auto itr = filteredSourceSinks.begin(); itr != filteredSourceSinks.end(); ++itr)
200                 {
201                         AbstractSink* sink = itr->first;
202                         const std::string & sourceUuid = itr->second;
203
204                         bool isFiltered = !sourceUuid.empty();
205
206                         if(isFiltered)
207                         {
208                                 DebugOut()<<"Property ("<<property<<") for sink is filtered for source: "<<sourceUuid<<endl;
209                         }
210
211                         if( !isFiltered || sourceUuid == uuid)
212                         {
213                                 sink->propertyChanged(value);
214                         }
215                 }
216         }
217         else
218         {
219                 DebugOut()<<__FUNCTION__<<"() there are no sinks connected to property: "<<property<<endl;
220         }
221
222         if(cbMapItr != propertyCbMap.end())
223         {
224                 FilteredSourceCbMap cbs = (*cbMapItr).second;
225
226                 for(auto itr : cbs)
227                 {
228                         uint handle = itr.first;
229                         const std::string& sourceUuid = itr.second;
230
231                         bool isFiltered = !sourceUuid.empty();
232
233                         if(isFiltered)
234                         {
235                                 DebugOut()<<"Property ("<<property<<") for cb is filtered for source: "<<sourceUuid<<endl;
236                         }
237
238                         if( !isFiltered || sourceUuid == uuid)
239                         {
240                                 if(handleCbMap.count(handle))
241                                 {
242                                         auto cb = handleCbMap[handle];
243                                         try
244                                         {
245                                                 cb(value);
246                                         }
247                                         catch(...)
248                                         {
249                                                 DebugOut(DebugOut::Warning)<<"Failed to call callback subscribed to property: "<<property<<endl;
250                                         }
251                                 }
252                         }
253                 }
254         }
255         else
256         {
257                 DebugOut()<<__FUNCTION__<<"() there are no cb connected to property: "<<property<<endl;
258                 return;
259         }
260 }
261
262 void Core::registerSink(AbstractSink *self)
263 {
264         mSinks.insert(self);
265 }
266
267 void Core::unregisterSink(AbstractSink *self)
268 {
269         mSinks.erase(self);
270 }
271
272 AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
273 {
274         AbstractSource* source = sourceForProperty(request.property, request.sourceUuidFilter);
275
276         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
277
278         if(!source || ((source->supportedOperations() & AbstractSource::Get) != AbstractSource::Get)) { // not found or doesn't support AbstractSource::Get
279                 // Don't wait until timer expire, complete with error here.
280                 reply->error = AsyncPropertyReply::InvalidOperation;
281                 if(request.completed)
282                         request.completed(reply);
283         }
284         else{
285                 source->getPropertyAsync(reply);
286
287         }
288
289         /** right now the owner of the reply becomes the requestor that called this method.
290    *  reply will become invalid after the first reply. */
291         return reply;
292 }
293
294 void Core::getRangePropertyAsync(AsyncRangePropertyRequest request)
295 {
296         AsyncRangePropertyReply * reply = new AsyncRangePropertyReply(request);
297
298         bool anySupport = false;
299         for(auto src : mSources)
300         {
301                 if(((src->supportedOperations() & AbstractSource::GetRanged) == AbstractSource::GetRanged))
302                 {
303                         anySupport = true;
304                         src->getRangePropertyAsync(reply);
305                 }
306         }
307
308         if(!anySupport)
309         {
310                 reply->success = false;
311                 reply->error = AsyncPropertyReply::InvalidOperation;
312                 reply->completed(reply);
313         }
314 }
315
316 AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request)
317 {
318         AbstractSource* src = sourceForProperty(request.property, request.sourceUuidFilter);
319
320         if(src && ((src->supportedOperations() & AbstractSource::Set) == AbstractSource::Set))
321                 return src->setProperty(request);
322
323         DebugOut(DebugOut::Warning)<<"Error: setProperty opration failed.  Property may not be supported: "<<request.property<<endl;
324         return NULL;
325 }
326
327 bool Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractSink* sink)
328 {
329         auto sinksIt = propertySinkMap.find(property);
330         if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
331         {
332                 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
333                 return false;
334         }
335
336         DebugOut(1)<<"Subscribing to: "<<property<<endl;
337
338         bool subscribed(false);
339         auto itr = mMasterPropertyList.begin();
340         while(itr != mMasterPropertyList.end())
341         {
342                 VehicleProperty::Property prop = itr->second;
343                 if(prop == property) {
344                         AbstractSource* src = itr->first;
345                         src->subscribeToPropertyChanges(property);
346                         // Move to next source. It will skip all the remaining properties in this source.
347                         itr = mMasterPropertyList.upper_bound(src);
348                         subscribed = true;
349                 }
350                 else{
351                         ++itr;
352                 }
353         }
354
355         //if(subscribed)
356         propertySinkMap[property].emplace(sink, std::string(""));
357
358         return subscribed;
359 }
360
361 bool Core::subscribeToProperty(const VehicleProperty::Property &property, const string &sourceUuidFilter, AbstractSink *sink)
362 {
363         auto sinksIt = propertySinkMap.find(property);
364         if(sinksIt != propertySinkMap.end() && sinksIt->second.find(sink) != sinksIt->second.end())
365         {
366                 DebugOut(1)<<__FUNCTION__<<" property " << property << " has already been subscribed." << endl;
367                 return false;
368         }
369
370         DebugOut(1)<<"Subscribing to: "<<property<<endl;
371
372         AbstractSource* src = sourceForProperty(property, sourceUuidFilter);
373         if(!src)
374                 return false;
375
376         propertySinkMap[property].emplace(sink, sourceUuidFilter);
377
378         src->subscribeToPropertyChanges(property);
379
380         return true;
381 }
382
383 bool Core::subscribeToProperty(const VehicleProperty::Property &, const string & sourceUuidFilter, Zone::Type zoneFilter, AbstractSink *sink)
384 {
385         /// TODO: implement
386         throw std::runtime_error("Not implemented");
387 }
388
389 uint Core::subscribeToProperty(const VehicleProperty::Property &property, AbstractRoutingEngine::PropertyChangedType cb, std::string pid)
390 {
391         DebugOut(1)<<"Subscribing to: "<<property<<endl;
392
393         auto itr = mMasterPropertyList.begin();
394         while(itr != mMasterPropertyList.end())
395         {
396                 VehicleProperty::Property prop = itr->second;
397                 if(prop == property) {
398                         AbstractSource* src = itr->first;
399                         src->subscribeToPropertyChanges(property);
400                         // Move to next source. It will skip all the remaining properties in this source.
401                         itr = mMasterPropertyList.upper_bound(src);
402                 }
403                 else{
404                         ++itr;
405                 }
406         }
407
408         handleCbMap[++handleCount] = cb;
409         propertyCbMap[property].emplace(handleCount, std::string(""));
410         return handleCount;
411 }
412
413 bool Core::unsubscribeToProperty(const VehicleProperty::Property & property, AbstractSink* sink)
414 {
415         auto sinksIt = propertySinkMap.find(property);
416         if(sinksIt == propertySinkMap.end())
417         {
418                 DebugOut(1)<<__FUNCTION__<<" property not subscribed to: "<<property<<endl;
419                 return false;
420         }
421
422         sinksIt->second.erase(sink);
423
424         /// Now we check to see if this is the last subscriber
425         if(sinksIt->second.empty())
426         {
427                 propertySinkMap.erase(sinksIt);
428                 auto itr = mMasterPropertyList.begin();
429                 while(itr != mMasterPropertyList.end())
430                 {
431                         if(itr->second == property) {
432                                 AbstractSource* src = itr->first;
433                                 src->unsubscribeToPropertyChanges(property);
434                                 // Move to next source. It will skip all the remaining properties in this source.
435                                 itr = mMasterPropertyList.upper_bound(src);
436                         }
437                         else{
438                                 ++itr;
439                         }
440                 }
441         }
442
443         return true;
444 }
445
446 void Core::unsubscribeToProperty(uint handle)
447 {
448         handleCbMap.erase(handle);
449         /// TODO: unsubscribe from source
450 }
451
452 PropertyInfo Core::getPropertyInfo(const VehicleProperty::Property &property, const string &sourceUuid)
453 {
454         if(sourceUuid == "")
455                 return PropertyInfo::invalid();
456
457         auto srcs = sourcesForProperty(property);
458
459         if(!contains(srcs, sourceUuid))
460                 return PropertyInfo::invalid();
461
462         auto theSource = find_if(mSources.begin(), mSources.end(),[&sourceUuid](const std::set<AbstractSource*>::value_type & itr)
463         {
464                 return (itr)->uuid() == sourceUuid;
465         });
466
467         return (*theSource)->getPropertyInfo(property);
468 }
469
470 std::vector<string> Core::sourcesForProperty(const VehicleProperty::Property & property)
471 {
472         std::vector<std::string> list;
473
474         for(auto src : mSources)
475         {
476                 if(contains(src->supported(), property))
477                         list.push_back(src->uuid());
478         }
479
480         return list;
481 }
482
483 void Core::inspectSupported()
484 {
485         for(AbstractSource* src : mSources)
486         {
487                 updateSupported(src->supported(), PropertyList(), src);
488         }
489 }
490
491 void Core::handleAddSupported(const PropertyList& added, AbstractSource* source)
492 {
493         if(!source)
494                 throw std::runtime_error("Core::handleAddSupported passed a null source");
495
496         if(!contains(mSources, source))
497         {
498                 mSources.insert(source);
499         }
500
501         for(auto property : added)
502         {
503                 if(!sourceForProperty(property, source->uuid()))
504                         mMasterPropertyList.emplace(source, property);
505
506                 // Subscribe to property in a new source if such property was subscribed. This catches newly supported properties in the process.
507                 if( propertySinkMap.find(property) != propertySinkMap.end()){
508                         source->subscribeToPropertyChanges(property);
509                 }
510         }
511 }
512
513 void Core::handleRemoveSupported(const PropertyList& removed, AbstractSource* source)
514 {
515         if(!source)
516                 return;
517
518         auto range = mMasterPropertyList.equal_range(source);
519         for(auto itr = removed.begin(); itr != removed.end(); ++itr)
520         {
521                 //
522                 // TODO: We do not have info about all subscribed sources in
523                 // std::unordered_map<VehicleProperty::Property, std::map<AbstractSink*, std::string> > propertySinkMap
524                 // so we do not know if we can/should remove property from propertySinkMap,
525                 // but I suppose this should be handled by each AbstractSink implementation in a callback AbstractSink::supportedChanged().
526
527                 const VehicleProperty::Property property(*itr);
528
529                 auto it = find_if(
530                                         range.first,    // the first property in source
531                                         range.second,   // one item right after the last property in source
532                                         [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
533                 );
534
535                 if (it != range.second)// property was found
536                 {
537                         mMasterPropertyList.erase(it);// References and iterators to the erased elements are invalidated. Other iterators and references are not invalidated.
538
539                         // TODO: Do we need to unsubscribe here ???
540                 }
541         }
542 }
543
544 AbstractSource* Core::sourceForProperty(const VehicleProperty::Property& property, const std::string& sourceUuidFilter) const
545 {
546         auto it = mMasterPropertyList.end();
547         if(sourceUuidFilter.empty()){
548                 it = std::find_if(mMasterPropertyList.begin(), mMasterPropertyList.end(),
549                                                   [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it) { return it.second == property; }
550                 );
551         }
552         else{
553                 auto itSource = find_if(mSources.begin(),mSources.end(),[&sourceUuidFilter](const std::set<AbstractSource*>::value_type & it)
554                 {
555                         return (it)->uuid() == sourceUuidFilter;
556                 });
557                 if(itSource != mSources.end()){
558                         auto range = mMasterPropertyList.equal_range(*itSource);
559                         auto temp = find_if(
560                                                 range.first,    // the first property in source
561                                                 range.second,   // one item right after the last property in source
562                                                 [&property](const std::multimap<AbstractSource*, VehicleProperty::Property>::value_type& it)
563                         {
564                                 return it.second == property; }
565                         );
566
567                         if (temp != range.second)// property was found
568                                 it = temp;
569                 }
570         }
571
572         if(it == mMasterPropertyList.end())
573                 return nullptr;
574         else
575                 return it->first;
576 }
577