fixed issue where database plugin was not properly exporting zones for supported...
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
1 #include "databasesink.h"
2 #include "abstractroutingengine.h"
3 #include "listplusplus.h"
4 #include "superptr.hpp"
5 #include "uuidhelper.h"
6 #include "ambplugin.h"
7
8 #include <thread>
9
10 int bufferLength = 100;
11 int timeout=1000;
12
13 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
14 {
15         auto plugin = new AmbPlugin<DatabaseSink>(routingengine, config);
16         plugin->init();
17
18         return plugin;
19 }
20
21 void * cbFunc(Shared* shared)
22 {
23         if(!shared)
24         {
25                 throw std::runtime_error("Could not cast shared object.");
26         }
27
28         ///new tripID:
29         shared->tripId = amb::createUuid();
30
31         vector<DictionaryList<string> > insertList;
32
33         while(1)
34         {
35                 usleep(timeout*1000);
36
37                 DBObject obj = shared->queue.pop();
38
39                 if( obj.quit )
40                 {
41                         break;
42                 }
43
44                 DictionaryList<string> dict;
45
46                 NameValuePair<string> one("key", obj.key);
47                 NameValuePair<string> two("value", obj.value);
48                 NameValuePair<string> three("source", obj.source);
49                 NameValuePair<string> zone("zone", boost::lexical_cast<string>(obj.zone));
50                 NameValuePair<string> four("time", boost::lexical_cast<string>(obj.time));
51                 NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj.sequence));
52                 NameValuePair<string> six("tripId", boost::lexical_cast<string>(shared->tripId));
53
54                 dict.push_back(one);
55                 dict.push_back(two);
56                 dict.push_back(three);
57                 dict.push_back(zone);
58                 dict.push_back(four);
59                 dict.push_back(five);
60                 dict.push_back(six);
61
62                 insertList.push_back(dict);
63
64                 if(insertList.size() > bufferLength)
65                 {
66                         shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
67                         for(int i=0; i< insertList.size(); i++)
68                         {
69                                 DictionaryList<string> d = insertList[i];
70                                 shared->db->insert(d);
71                         }
72                         shared->db->exec("END TRANSACTION");
73                         insertList.clear();
74                 }
75                 //delete obj;
76         }
77
78         /// final flush of whatever is still in the queue:
79
80         shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
81         for(int i=0; i< insertList.size(); i++)
82         {
83                 DictionaryList<string> d = insertList[i];
84                 shared->db->insert(d);
85         }
86         shared->db->exec("END TRANSACTION");
87
88         return NULL;
89 }
90
91 int getNextEvent(gpointer data)
92 {
93         PlaybackShared* pbshared = static_cast<PlaybackShared*>(data);
94
95         if(!pbshared)
96                 throw std::runtime_error("failed to cast PlaybackShared object");
97
98         if(pbshared->stop)
99                 return 0;
100
101         auto itr = pbshared->playbackQueue.begin();
102
103         if(itr == pbshared->playbackQueue.end())
104         {
105                 return 0;
106         }
107
108         DBObject obj = *itr;
109
110         auto value = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(obj.key, obj.value));
111
112         if(value)
113         {
114                 value->priority = AbstractPropertyType::Instant;
115                 value->timestamp = obj.time;
116                 value->sequence = obj.sequence;
117                 value->sourceUuid = obj.source;
118                 value->zone = obj.zone;
119                 pbshared->routingEngine->updateProperty(value.get(), pbshared->uuid);
120         }
121
122         if(++itr != pbshared->playbackQueue.end())
123         {
124                 DBObject o2 = *itr;
125                 double t = o2.time - obj.time;
126
127                 if(t > 0)
128                         g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
129                 else
130                         g_timeout_add(1, getNextEvent, pbshared);
131         }
132
133         pbshared->playbackQueue.remove(obj);
134         DebugOut()<<"playback Queue size: "<<pbshared->playbackQueue.size()<<endl;
135
136         return 0;
137 }
138
139 DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config, AbstractSource &parent)
140         :AmbPluginImpl(engine, config, parent), shared(nullptr), playback(false), playbackShared(nullptr), playbackMultiplier(1)
141 {
142         databaseName = "storage";
143         tablename = "data";
144         tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, zone REAL, time REAL, sequence REAL, tripId TEXT)";
145
146         if(config.find("databaseFile") != config.end())
147         {
148                 setDatabaseFileName(config["databaseFile"]);
149         }
150
151         if(config.find("bufferLength") != config.end())
152         {
153                 bufferLength = atoi(config["bufferLength"].c_str());
154         }
155
156         if(config.find("frequency") != config.end())
157         {
158                 try
159                 {
160                         int t = boost::lexical_cast<int>(config["frequency"]);
161                         timeout = 1000 / t;
162                 }catch(...)
163                 {
164                         DebugOut(DebugOut::Error)<<"Failed to parse frequency: Invalid value "<<config["frequency"]<<endl;
165                 }
166
167
168         }
169
170         if(config.find("properties") != config.end())
171         {
172                 parseConfig();
173         }
174
175         for(auto itr : propertiesToSubscribeTo)
176         {
177                 engine->subscribeToProperty(itr, &parent);
178         }
179
180         addPropertySupport(Zone::None, [](){return new DatabaseFileType("storage");});
181         addPropertySupport(Zone::None, [](){return new DatabasePlaybackType(false);});
182         addPropertySupport(Zone::None, [](){return new DatabaseLoggingType(false);});
183
184         routingEngine->updateSupported(supported(), PropertyList(), &parent);
185
186         if(config.find("startOnLoad")!= config.end())
187         {
188                 setLogging(config["startOnLoad"] == "true");
189         }
190
191         if(config.find("playbackMultiplier")!= config.end())
192         {
193                 playbackMultiplier = boost::lexical_cast<uint>(config["playbackMultiplier"]);
194         }
195
196         if(config.find("playbackOnLoad")!= config.end())
197         {
198                 setPlayback(config["playbackOnLoad"] == "true");
199         }
200
201
202 }
203
204 DatabaseSink::~DatabaseSink()
205 {
206         if(shared)
207         {
208                 stopDb();
209         }
210
211         if(playbackShared)
212         {
213                 delete playbackShared;
214         }
215 }
216
217
218 void DatabaseSink::supportedChanged(const PropertyList &supportedProperties)
219 {
220
221 }
222
223 void DatabaseSink::parseConfig()
224 {
225         json_object *rootobject;
226         json_tokener *tokener = json_tokener_new();
227         enum json_tokener_error err;
228         do
229         {
230                 rootobject = json_tokener_parse_ex(tokener, configuration["properties"].c_str(),configuration["properties"].size());
231         } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
232         if (err != json_tokener_success)
233         {
234                 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
235         }
236         if (tokener->char_offset < configuration["properties"].size()) // XXX shouldn't access internal fields
237         {
238                 //Should handle the extra data here sometime...
239         }
240
241         json_object *propobject = json_object_object_get(rootobject,"properties");
242
243         g_assert(json_object_get_type(propobject) == json_type_array);
244
245         array_list *proplist = json_object_get_array(propobject);
246
247         for(int i=0; i < array_list_length(proplist); i++)
248         {
249                 json_object *idxobj = (json_object*)array_list_get_idx(proplist,i);
250                 std::string prop = json_object_get_string(idxobj);
251                 propertiesToSubscribeTo.push_back(prop);
252
253                 DebugOut()<<"DatabaseSink logging: "<<prop<<endl;
254         }
255
256         //json_object_put(propobject);
257         json_object_put(rootobject);
258 }
259
260 void DatabaseSink::stopDb()
261 {
262         if(!shared)
263                 return;
264
265         DBObject obj;
266         obj.quit = true;
267         shared->queue.append(obj);
268
269         thread.join();
270
271         delete shared;
272         shared = NULL;
273 }
274
275 void DatabaseSink::startDb()
276 {
277         if(playback.basicValue())
278         {
279                 DebugOut(0)<<"ERROR: tried to start logging during playback.  Only logging or playback can be used at one time"<<endl;
280                 return;
281         }
282
283         if(shared)
284         {
285                 DebugOut(0)<<"WARNING: logging already started.  doing nothing."<<endl;
286                 return;
287         }
288
289         initDb();
290
291         thread = std::thread(cbFunc, shared);
292 }
293
294 void DatabaseSink::startPlayback()
295 {
296         if(playback.basicValue())
297                 return;
298
299         playback = true;
300
301         initDb();
302
303         /// populate playback queue:
304
305         vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
306
307         /// we are done with shared.  clean up:
308         delete shared;
309         shared = NULL;
310
311         if(playbackShared)
312         {
313                 delete playbackShared;
314         }
315
316         playbackShared = new PlaybackShared(routingEngine, uuid(), playbackMultiplier);
317
318         for(int i=0;i<results.size();i++)
319         {
320                 if(results[i].size() < 5)
321                 {
322                         throw std::runtime_error("column mismatch in query");
323                 }
324
325                 DBObject obj;
326
327                 obj.key = results[i][0];
328                 obj.value = results[i][1];
329                 obj.source = results[i][2];
330                 obj.zone = boost::lexical_cast<double>(results[i][3]);
331                 obj.time = boost::lexical_cast<double>(results[i][4]);
332                 obj.sequence = boost::lexical_cast<double>(results[i][5]);
333
334                 playbackShared->playbackQueue.push_back(obj);
335         }
336
337         g_timeout_add(0, getNextEvent, playbackShared);
338 }
339
340 void DatabaseSink::initDb()
341 {
342         if(shared) delete shared;
343
344         shared = new Shared;
345         shared->db->init(databaseName.value<std::string>(), tablename, tablecreate);
346 }
347
348 void DatabaseSink::setPlayback(bool v)
349 {
350         AsyncSetPropertyRequest request;
351         request.property = DatabasePlayback;
352         request.value = new DatabasePlaybackType(v);
353
354         setProperty(request);
355 }
356
357 void DatabaseSink::setLogging(bool b)
358 {
359         databaseLogging = b;
360         AsyncSetPropertyRequest request;
361         request.property = DatabaseLogging;
362         request.value = &databaseLogging;
363
364         setProperty(request);
365 }
366
367 void DatabaseSink::setDatabaseFileName(string filename)
368 {
369         databaseName = filename;
370
371         initDb();
372
373         vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
374
375         for(int i=0; i < supportedStr.size(); i++)
376         {
377                 if(!contains(supported(), supportedStr[i][0]))
378                 {
379                         std::string name = supportedStr[i][0];
380                         Zone::Type zone = boost::lexical_cast<int, std::string>(supportedStr[i][3]);
381                         addPropertySupport(zone, [name]() { return VehicleProperty::getPropertyTypeForPropertyNameValue(name); });
382                 }
383         }
384
385         delete shared;
386         shared = NULL;
387
388         routingEngine->updateSupported(mSupported, PropertyList(), &source);
389 }
390
391 void DatabaseSink::propertyChanged(AbstractPropertyType *value)
392 {
393         VehicleProperty::Property property = value->name;
394
395         if(!shared)
396                 return;
397
398         if(!contains(supported(), property))
399         {
400                 addPropertySupport(value->zone, [property]() { return VehicleProperty::getPropertyTypeForPropertyNameValue(property);});
401                 routingEngine->updateSupported(mSupported, PropertyList(), &source);
402         }
403
404         DBObject obj;
405         obj.key = property;
406         obj.value = value->toString();
407         obj.source = value->sourceUuid;
408         obj.time = value->timestamp;
409         obj.sequence = value->sequence;
410         obj.zone = value->zone;
411
412         shared->queue.append(obj);
413 }
414
415
416 const std::string DatabaseSink::uuid() const
417 {
418         return "9f88156e-cb92-4472-8775-9c08addf50d3";
419 }
420
421 void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
422 {
423         reply->success = false;
424
425         if(reply->property == DatabaseFile)
426         {
427                 DatabaseFileType temp(databaseName);
428                 reply->value = &temp;
429
430                 reply->success = true;
431                 reply->completed(reply);
432
433                 return;
434         }
435         else if(reply->property == DatabaseLogging)
436         {
437                 databaseLogging = shared != nullptr;
438
439                 reply->value = &databaseLogging;
440                 reply->success = true;
441                 reply->completed(reply);
442
443                 return;
444         }
445
446         else if(reply->property == DatabasePlayback)
447         {
448                 DatabasePlaybackType temp = playback;
449                 reply->value = &temp;
450                 reply->success = true;
451                 reply->completed(reply);
452
453                 return;
454         }
455
456         reply->completed(reply);
457 }
458
459 void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
460 {
461         BaseDB * db = new BaseDB();
462         db->init(databaseName.value<std::string>(), tablename, tablecreate);
463
464         ostringstream query;
465         query.precision(15);
466
467         query<<"SELECT * from "<<tablename<<" WHERE (";
468
469         for(auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
470         {
471                 if(itr != reply->properties.begin())
472                         query<<" OR ";
473
474                 query<<"key='"<<(*itr)<<"'";
475         }
476
477         query<<") AND";
478
479         if(reply->timeBegin && reply->timeEnd)
480         {
481                 query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd;
482         }
483
484         if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0)
485         {
486                 query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd;
487         }
488
489         if(reply->sourceUuid != "")
490                 query<<" AND source='"<<reply->sourceUuid<<"'";
491
492         query<<" AND zone="<<reply->zone;
493
494         std::vector<std::vector<string>> data = db->select(query.str());
495
496         DebugOut()<<"Dataset size "<<data.size()<<endl;
497
498         for(auto i=0;i<data.size();i++)
499         {
500                 if(data[i].size() != 6)
501                         continue;
502
503                 DBObject dbobj;
504                 dbobj.key = data[i][0];
505                 dbobj.value = data[i][1];
506                 dbobj.source = data[i][2];
507                 dbobj.zone = boost::lexical_cast<double>(data[i][3]);
508                 dbobj.time = boost::lexical_cast<double>(data[i][4]);
509                 dbobj.sequence = boost::lexical_cast<double>(data[i][5]);
510
511                 AbstractPropertyType* property = VehicleProperty::getPropertyTypeForPropertyNameValue(dbobj.key, dbobj.value);
512                 if(property)
513                 {
514                         property->timestamp = dbobj.time;
515                         property->sequence = dbobj.sequence;
516
517                         reply->values.push_back(property);
518                 }
519         }
520
521         reply->success = true;
522         reply->completed(reply);
523
524         delete db;
525 }
526
527 AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
528 {
529         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
530         reply->success = false;
531
532         if(request.property == DatabaseLogging)
533         {
534                 if(request.value->value<bool>())
535                 {
536                         setPlayback(false);
537                         startDb();
538                         reply->success = true;
539                         databaseLogging = true;
540                         routingEngine->updateProperty(&databaseLogging,uuid());
541                 }
542                 else
543                 {
544                         stopDb();
545                         reply->success = true;
546                         databaseLogging = false;
547                         routingEngine->updateProperty(&databaseLogging,uuid());
548                 }
549         }
550
551         else if(request.property == DatabaseFile)
552         {
553                 std::string fname = request.value->toString();
554
555                 databaseName = fname;
556
557                 routingEngine->updateProperty(&databaseName,uuid());
558
559                 reply->success = true;
560         }
561         else if( request.property == DatabasePlayback)
562         {
563                 if(request.value->value<bool>())
564                 {
565                         setLogging(false);
566                         startPlayback();
567
568                         routingEngine->updateProperty(&playback,uuid());
569                 }
570                 else
571                 {
572                         if(playbackShared)
573                                 playbackShared->stop = true;
574
575                         playback = false;
576
577                         routingEngine->updateProperty(&playback,uuid());
578                 }
579
580                 reply->success = true;
581         }
582
583         return reply;
584 }
585
586 void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property )
587 {
588
589 }
590
591 void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property )
592 {
593 }