Fix all plugins; fix websocket plugins
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
1 #include "databasesink.h"
2 #include "abstractroutingengine.h"
3 #include "listplusplus.h"
4 #include "superptr.hpp"
5 #include "uuidhelper.h"
6 #include "ambplugin.h"
7
8 #include <thread>
9
10 int bufferLength = 100;
11 int timeout=1000;
12
13 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
14 {
15         auto plugin = new AmbPlugin<DatabaseSink>(routingengine, config);
16         plugin->init();
17
18         return plugin;
19 }
20
21 void * cbFunc(Shared* shared)
22 {
23         if(!shared)
24         {
25                 throw std::runtime_error("Could not cast shared object.");
26         }
27
28         ///new tripID:
29         shared->tripId = amb::createUuid();
30
31         vector<DictionaryList<string> > insertList;
32
33         while(1)
34         {
35                 usleep(timeout*1000);
36
37                 DBObject obj = shared->queue.pop();
38
39                 if( obj.quit )
40                 {
41                         break;
42                 }
43
44                 DictionaryList<string> dict;
45
46                 NameValuePair<string> one("key", obj.key);
47                 NameValuePair<string> two("value", obj.value);
48                 NameValuePair<string> three("source", obj.source);
49                 NameValuePair<string> zone("zone", boost::lexical_cast<string>(obj.zone));
50                 NameValuePair<string> four("time", boost::lexical_cast<string>(obj.time));
51                 NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj.sequence));
52                 NameValuePair<string> six("tripId", boost::lexical_cast<string>(shared->tripId));
53
54                 dict.push_back(one);
55                 dict.push_back(two);
56                 dict.push_back(three);
57                 dict.push_back(zone);
58                 dict.push_back(four);
59                 dict.push_back(five);
60                 dict.push_back(six);
61
62                 insertList.push_back(dict);
63
64                 if(insertList.size() > bufferLength)
65                 {
66                         shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
67                         for(int i=0; i< insertList.size(); i++)
68                         {
69                                 DictionaryList<string> d = insertList[i];
70                                 shared->db->insert(d);
71                         }
72                         shared->db->exec("END TRANSACTION");
73                         insertList.clear();
74                 }
75                 //delete obj;
76         }
77
78         /// final flush of whatever is still in the queue:
79
80         shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
81         for(int i=0; i< insertList.size(); i++)
82         {
83                 DictionaryList<string> d = insertList[i];
84                 shared->db->insert(d);
85         }
86         shared->db->exec("END TRANSACTION");
87
88         return NULL;
89 }
90
91 int getNextEvent(gpointer data)
92 {
93         PlaybackShared* pbshared = static_cast<PlaybackShared*>(data);
94
95         if(!pbshared)
96                 throw std::runtime_error("failed to cast PlaybackShared object");
97
98         if(pbshared->stop)
99                 return 0;
100
101         auto itr = pbshared->playbackQueue.begin();
102
103         if(itr == pbshared->playbackQueue.end())
104         {
105                 return 0;
106         }
107
108         DBObject obj = *itr;
109
110         auto value = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(obj.key, obj.value));
111
112         if(value)
113         {
114                 value->priority = AbstractPropertyType::Instant;
115                 value->timestamp = obj.time;
116                 value->sequence = obj.sequence;
117                 value->sourceUuid = obj.source;
118                 value->zone = obj.zone;
119                 pbshared->routingEngine->updateProperty(value.get(), pbshared->uuid);
120         }
121
122         if(++itr != pbshared->playbackQueue.end())
123         {
124                 DBObject o2 = *itr;
125                 double t = o2.time - obj.time;
126
127                 if(t > 0)
128                         g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
129                 else
130                         g_timeout_add(1, getNextEvent, pbshared);
131         }
132
133         pbshared->playbackQueue.remove(obj);
134         DebugOut()<<"playback Queue size: "<<pbshared->playbackQueue.size()<<endl;
135
136         return 0;
137 }
138
139 DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config, AbstractSource &parent)
140         :AmbPluginImpl(engine, config, parent), shared(nullptr), playback(false), playbackShared(nullptr), playbackMultiplier(1)
141 {
142         databaseName = "storage";
143         tablename = "data";
144         tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, zone INTEGER, time REAL, sequence REAL, tripId TEXT)";
145
146         if(config.find("bufferLength") != config.end())
147         {
148                 bufferLength = atoi(config["bufferLength"].c_str());
149         }
150
151         if(config.find("frequency") != config.end())
152         {
153                 try
154                 {
155                         int t = boost::lexical_cast<int>(config["frequency"]);
156                         timeout = 1000 / t;
157                 }catch(...)
158                 {
159                         DebugOut(DebugOut::Error)<<"Failed to parse frequency: Invalid value "<<config["frequency"]<<endl;
160                 }
161
162
163         }
164
165         if(config.find("properties") != config.end())
166         {
167                 parseConfig();
168         }
169
170         for(auto itr : propertiesToSubscribeTo)
171         {
172                 engine->subscribeToProperty(itr, &parent);
173         }
174
175         addPropertySupport(Zone::None, [](){ return new DatabaseFileType("storage"); });
176         addPropertySupport(Zone::None, [](){ return new DatabasePlaybackType(false); });
177         addPropertySupport(Zone::None, [](){ return new DatabaseLoggingType(false); });
178
179         if(config.find("startOnLoad")!= config.end())
180         {
181                 setLogging(config["startOnLoad"] == "true");
182         }
183
184         if(config.find("playbackMultiplier")!= config.end())
185         {
186                 playbackMultiplier = boost::lexical_cast<uint>(config["playbackMultiplier"]);
187         }
188
189         if(config.find("playbackOnLoad")!= config.end())
190         {
191                 setPlayback(config["playbackOnLoad"] == "true");
192         }
193 }
194
195 DatabaseSink::~DatabaseSink()
196 {
197         if(shared)
198         {
199                 stopDb();
200         }
201
202         if(playbackShared)
203         {
204                 delete playbackShared;
205         }
206 }
207
208
209 void DatabaseSink::supportedChanged(const PropertyList &supportedProperties)
210 {
211
212 }
213
214 void DatabaseSink::parseConfig()
215 {
216         json_object *rootobject;
217         json_tokener *tokener = json_tokener_new();
218         enum json_tokener_error err;
219         do
220         {
221                 rootobject = json_tokener_parse_ex(tokener, configuration["properties"].c_str(),configuration["properties"].size());
222         } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
223         if (err != json_tokener_success)
224         {
225                 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
226         }
227         if (tokener->char_offset < configuration["properties"].size()) // XXX shouldn't access internal fields
228         {
229                 //Should handle the extra data here sometime...
230         }
231
232         json_object *propobject = json_object_object_get(rootobject,"properties");
233
234         g_assert(json_object_get_type(propobject) == json_type_array);
235
236         array_list *proplist = json_object_get_array(propobject);
237
238         for(int i=0; i < array_list_length(proplist); i++)
239         {
240                 json_object *idxobj = (json_object*)array_list_get_idx(proplist,i);
241                 std::string prop = json_object_get_string(idxobj);
242                 propertiesToSubscribeTo.push_back(prop);
243
244                 DebugOut()<<"DatabaseSink logging: "<<prop<<endl;
245         }
246
247         //json_object_put(propobject);
248         json_object_put(rootobject);
249 }
250
251 void DatabaseSink::stopDb()
252 {
253         if(!shared)
254                 return;
255
256         DBObject obj;
257         obj.quit = true;
258         shared->queue.append(obj);
259
260         thread.join();
261
262         delete shared;
263         shared = NULL;
264 }
265
266 void DatabaseSink::startDb()
267 {
268         if(playback.basicValue())
269         {
270                 DebugOut(0)<<"ERROR: tried to start logging during playback.  Only logging or playback can be used at one time"<<endl;
271                 return;
272         }
273
274         if(shared)
275         {
276                 DebugOut(0)<<"WARNING: logging already started.  doing nothing."<<endl;
277                 return;
278         }
279
280         initDb();
281
282         thread = std::thread(cbFunc, shared);
283 }
284
285 void DatabaseSink::startPlayback()
286 {
287         if(playback.basicValue())
288                 return;
289
290         playback = true;
291
292         initDb();
293
294         /// populate playback queue:
295
296         vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
297
298         /// we are done with shared.  clean up:
299         delete shared;
300         shared = NULL;
301
302         if(playbackShared)
303         {
304                 delete playbackShared;
305         }
306
307         playbackShared = new PlaybackShared(routingEngine, uuid(), playbackMultiplier);
308
309         for(int i=0;i<results.size();i++)
310         {
311                 if(results[i].size() < 5)
312                 {
313                         throw std::runtime_error("column mismatch in query");
314                 }
315
316                 DBObject obj;
317
318                 obj.key = results[i][0];
319                 obj.value = results[i][1];
320                 obj.source = results[i][2];
321                 obj.zone = boost::lexical_cast<int>(results[i][3]);
322                 obj.time = boost::lexical_cast<double>(results[i][4]);
323                 obj.sequence = boost::lexical_cast<double>(results[i][5]);
324
325                 playbackShared->playbackQueue.push_back(obj);
326         }
327
328         g_timeout_add(0, getNextEvent, playbackShared);
329 }
330
331 void DatabaseSink::initDb()
332 {
333         if(shared) delete shared;
334
335         shared = new Shared;
336         shared->db->init(databaseName.value<std::string>(), tablename, tablecreate);
337 }
338
339 void DatabaseSink::setPlayback(bool v)
340 {
341         AsyncSetPropertyRequest request;
342         request.property = DatabasePlayback;
343         request.value = new DatabasePlaybackType(v);
344
345         setProperty(request);
346 }
347
348 void DatabaseSink::setLogging(bool b)
349 {
350         databaseLogging = b;
351         AsyncSetPropertyRequest request;
352         request.property = DatabaseLogging;
353         request.value = &databaseLogging;
354
355         setProperty(request);
356 }
357
358 void DatabaseSink::setDatabaseFileName(string filename)
359 {
360         databaseName = filename;
361
362         initDb();
363
364         vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key, zone FROM "+tablename);
365
366         for(int i=0; i < supportedStr.size(); i++)
367         {
368                 std::string name = supportedStr[i][0];
369
370                 if(!contains(supported(), name))
371                 {
372                         std::string zoneStr = supportedStr[i][1];
373
374                         DebugOut() << "adding property " << name << " in zone: " << zoneStr << endl;
375
376                         Zone::Type zone = boost::lexical_cast<Zone::Type>(zoneStr);
377                         addPropertySupport(zone, [name]() { return VehicleProperty::getPropertyTypeForPropertyNameValue(name); });
378                 }
379         }
380
381         delete shared;
382         shared = NULL;
383
384         routingEngine->updateSupported(supported(), PropertyList(), &source);
385 }
386
387 void DatabaseSink::propertyChanged(AbstractPropertyType *value)
388 {
389         VehicleProperty::Property property = value->name;
390
391         if(!shared)
392                 return;
393
394         if(!contains(supported(), property))
395         {
396                 addPropertySupport(value->zone, [property]() { return VehicleProperty::getPropertyTypeForPropertyNameValue(property);});
397                 routingEngine->updateSupported(supported(), PropertyList(), &source);
398         }
399
400         DBObject obj;
401         obj.key = property;
402         obj.value = value->toString();
403         obj.source = value->sourceUuid;
404         obj.time = value->timestamp;
405         obj.sequence = value->sequence;
406         obj.zone = value->zone;
407
408         shared->queue.append(obj);
409 }
410
411
412 const std::string DatabaseSink::uuid() const
413 {
414         return "9f88156e-cb92-4472-8775-9c08addf50d3";
415 }
416
417 void DatabaseSink::init()
418 {
419         if(configuration.find("databaseFile") != configuration.end())
420         {
421                 setDatabaseFileName(configuration["databaseFile"]);
422         }
423
424         routingEngine->updateSupported(supported(), PropertyList(), &source);
425 }
426
427 void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
428 {
429         reply->success = false;
430
431         if(reply->property == DatabaseFile)
432         {
433                 DatabaseFileType temp(databaseName);
434                 reply->value = &temp;
435
436                 reply->success = true;
437                 reply->completed(reply);
438
439                 return;
440         }
441         else if(reply->property == DatabaseLogging)
442         {
443                 databaseLogging = shared != nullptr;
444
445                 reply->value = &databaseLogging;
446                 reply->success = true;
447                 reply->completed(reply);
448
449                 return;
450         }
451
452         else if(reply->property == DatabasePlayback)
453         {
454                 DatabasePlaybackType temp = playback;
455                 reply->value = &temp;
456                 reply->success = true;
457                 reply->completed(reply);
458
459                 return;
460         }
461
462         reply->completed(reply);
463 }
464
465 void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
466 {
467         BaseDB * db = new BaseDB();
468         db->init(databaseName.value<std::string>(), tablename, tablecreate);
469
470         ostringstream query;
471         query.precision(15);
472
473         query<<"SELECT * from "<<tablename<<" WHERE (";
474
475         for(auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
476         {
477                 if(itr != reply->properties.begin())
478                         query<<" OR ";
479
480                 query<<"key='"<<(*itr)<<"'";
481         }
482
483         query<<") AND";
484
485         if(reply->timeBegin && reply->timeEnd)
486         {
487                 query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd;
488         }
489
490         if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0)
491         {
492                 query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd;
493         }
494
495         if(reply->sourceUuid != "")
496                 query<<" AND source='"<<reply->sourceUuid<<"'";
497
498         query<<" AND zone="<<reply->zone;
499
500         std::vector<std::vector<string>> data = db->select(query.str());
501
502         DebugOut()<<"Dataset size "<<data.size()<<endl;
503
504         for(auto i=0;i<data.size();i++)
505         {
506                 if(data[i].size() != 6)
507                         continue;
508
509                 DBObject dbobj;
510                 dbobj.key = data[i][0];
511                 dbobj.value = data[i][1];
512                 dbobj.source = data[i][2];
513                 dbobj.zone = boost::lexical_cast<double>(data[i][3]);
514                 dbobj.time = boost::lexical_cast<double>(data[i][4]);
515                 dbobj.sequence = boost::lexical_cast<double>(data[i][5]);
516
517                 AbstractPropertyType* property = VehicleProperty::getPropertyTypeForPropertyNameValue(dbobj.key, dbobj.value);
518                 if(property)
519                 {
520                         property->timestamp = dbobj.time;
521                         property->sequence = dbobj.sequence;
522
523                         reply->values.push_back(property);
524                 }
525         }
526
527         reply->success = true;
528         reply->completed(reply);
529
530         delete db;
531 }
532
533 AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
534 {
535         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
536         reply->success = false;
537
538         if(request.property == DatabaseLogging)
539         {
540                 if(request.value->value<bool>())
541                 {
542                         setPlayback(false);
543                         startDb();
544                         reply->success = true;
545                         databaseLogging = true;
546                         routingEngine->updateProperty(&databaseLogging,uuid());
547                 }
548                 else
549                 {
550                         stopDb();
551                         reply->success = true;
552                         databaseLogging = false;
553                         routingEngine->updateProperty(&databaseLogging,uuid());
554                 }
555         }
556
557         else if(request.property == DatabaseFile)
558         {
559                 std::string fname = request.value->toString();
560
561                 databaseName = fname;
562
563                 routingEngine->updateProperty(&databaseName,uuid());
564
565                 reply->success = true;
566         }
567         else if( request.property == DatabasePlayback)
568         {
569                 if(request.value->value<bool>())
570                 {
571                         setLogging(false);
572                         startPlayback();
573
574                         routingEngine->updateProperty(&playback,uuid());
575                 }
576                 else
577                 {
578                         if(playbackShared)
579                                 playbackShared->stop = true;
580
581                         playback = false;
582
583                         routingEngine->updateProperty(&playback,uuid());
584                 }
585
586                 reply->success = true;
587         }
588
589         return reply;
590 }
591
592 void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property )
593 {
594
595 }
596
597 void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property )
598 {
599 }
600