flush left over data in database
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
1 #include "databasesink.h"
2 #include "abstractroutingengine.h"
3 #include "listplusplus.h"
4
5 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
6 {
7         return new DatabaseSinkManager(routingengine, config);
8 }
9
10 void * cbFunc(gpointer data)
11 {
12         Shared *shared = static_cast<Shared*>(data);
13
14         if(!shared)
15         {
16                 throw std::runtime_error("Could not cast shared object.");
17         }
18
19         vector<DictionaryList<string> > insertList;
20
21         while(1)
22         {
23                 DBObject* obj = shared->queue.pop();
24
25                 if( obj->quit )
26                 {
27                         delete obj;
28                         break;
29                 }
30
31                 DictionaryList<string> dict;
32
33                 NameValuePair<string> one("key", obj->key);
34                 NameValuePair<string> two("value", obj->value);
35                 NameValuePair<string> three("source", obj->source);
36                 NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time));
37                 NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence));
38
39                 dict.push_back(one);
40                 dict.push_back(two);
41                 dict.push_back(three);
42                 dict.push_back(four);
43                 dict.push_back(five);
44
45                 insertList.push_back(dict);
46
47                 if(insertList.size() > 100)
48                 {
49                         shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
50                         for(int i=0; i< insertList.size(); i++)
51                         {
52                                 DictionaryList<string> d = insertList[i];
53                                 shared->db->insert(d);
54                         }
55                         shared->db->exec("END TRANSACTION");
56                         insertList.clear();
57                 }
58                 delete obj;
59         }
60
61         /// final flush of whatever is still in the queue:
62
63         shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
64         for(int i=0; i< insertList.size(); i++)
65         {
66                 DictionaryList<string> d = insertList[i];
67                 shared->db->insert(d);
68         }
69         shared->db->exec("END TRANSACTION");
70
71         return NULL;
72 }
73
74 int getNextEvent(gpointer data)
75 {
76         PlaybackShared* pbshared = static_cast<PlaybackShared*>(data);
77
78         if(!pbshared)
79                 throw std::runtime_error("failed to cast PlaybackShared object");
80
81         auto itr = pbshared->playbackQueue.begin();
82
83         if(itr == pbshared->playbackQueue.end())
84         {
85                 return 0;
86         }
87
88         DBObject* obj = *itr;
89
90         AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj->key,obj->value);
91
92         if(value)
93         {
94                 pbshared->routingEngine->updateProperty(obj->key, value, pbshared->uuid);
95                 value->timestamp = obj->time;
96                 //value->sequence = obj->sequence;
97         }
98
99         if(++itr != pbshared->playbackQueue.end())
100         {
101                 DBObject *o2 = *itr;
102                 double t = o2->time - obj->time;
103
104                 if(t > 0)
105                         g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
106                 else
107                         g_timeout_add(t, getNextEvent, pbshared);
108         }
109
110         pbshared->playbackQueue.remove(obj);
111         delete obj;
112
113         return 0;
114 }
115
116 DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
117         :AbstractSource(engine,config),thread(NULL),shared(NULL),playback(false),playbackShared(NULL), playbackMultiplier(1)
118 {
119         databaseName = "storage";
120         tablename = "data";
121         tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
122
123         if(config.find("databaseFile") != config.end())
124         {
125                 databaseName = config["databaseFile"];
126         }
127
128         if(config.find("properties") != config.end())
129         {
130                 parseConfig();
131         }
132
133         for(auto itr=propertiesToSubscribeTo.begin();itr!=propertiesToSubscribeTo.end();itr++)
134         {
135                 engine->subscribeToProperty(*itr,this);
136         }
137
138         mSupported.push_back(DatabaseFileProperty);
139         mSupported.push_back(DatabaseLoggingProperty);
140         mSupported.push_back(DatabasePlaybackProperty);
141
142         /// get supported:
143
144         initDb();
145
146         vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
147
148         for(int i=0; i < supportedStr.size(); i++)
149         {
150                 if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
151                         mSupported.push_back(supportedStr[i][0]);
152         }
153
154         delete shared;
155         shared = NULL;
156
157         routingEngine->setSupported(supported(), this);
158
159         if(config.find("startOnLoad")!= config.end())
160         {
161                 AsyncSetPropertyRequest request;
162                 request.property = DatabaseLoggingProperty;
163                 request.value = new BasicPropertyType<bool>(true);
164
165                 setProperty(request);
166         }
167
168         if(config.find("playbackMultiplier")!= config.end())
169         {
170                 playbackMultiplier = boost::lexical_cast<uint>(config["playbackMultiplier"]);
171         }
172
173         if(config.find("playbackOnLoad")!= config.end())
174         {
175                 startPlayback();
176         }
177
178
179 }
180
181 DatabaseSink::~DatabaseSink()
182 {
183         if(shared)
184         {
185                 DBObject* obj = new DBObject();
186                 obj->quit = true;
187
188                 shared->queue.append(obj);
189
190                 g_thread_join(thread);
191                 g_thread_unref(thread);
192                 delete shared;
193         }
194
195         if(playbackShared)
196         {
197                 delete playbackShared;
198         }
199 }
200
201
202 void DatabaseSink::supportedChanged(PropertyList supportedProperties)
203 {
204
205 }
206
207 PropertyList DatabaseSink::supported()
208 {
209         return mSupported;
210 }
211
212 void DatabaseSink::parseConfig()
213 {
214         json_object *rootobject;
215         json_tokener *tokener = json_tokener_new();
216         enum json_tokener_error err;
217         do
218         {
219                 rootobject = json_tokener_parse_ex(tokener, configuration["properties"].c_str(),configuration["properties"].size());
220         } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
221         if (err != json_tokener_success)
222         {
223                 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
224         }
225         if (tokener->char_offset < configuration["properties"].size()) // XXX shouldn't access internal fields
226         {
227                 //Should handle the extra data here sometime...
228         }
229         
230         json_object *propobject = json_object_object_get(rootobject,"properties");
231         
232         g_assert(json_object_get_type(propobject) == json_type_array);
233
234         array_list *proplist = json_object_get_array(propobject);
235         
236         for(int i=0; i < array_list_length(proplist); i++)
237         {
238                 json_object *idxobj = (json_object*)array_list_get_idx(proplist,i);
239                 std::string prop = json_object_get_string(idxobj);
240                 propertiesToSubscribeTo.push_back(prop);
241
242                 DebugOut()<<"DatabaseSink logging: "<<prop<<endl;
243         }
244
245         json_object_put(propobject);
246         json_object_put(rootobject);
247 }
248
249 void DatabaseSink::stopDb()
250 {
251         if(!shared)
252                 return;
253
254         DBObject *obj = new DBObject();
255         obj->quit = true;
256         shared->queue.append(obj);
257
258         g_thread_join(thread);
259
260         delete shared;
261         shared = NULL;
262 }
263
264 void DatabaseSink::startDb()
265 {
266         if(playback)
267         {
268                 DebugOut(0)<<"ERROR: tried to start logging during playback.  Only logging or playback can be used at one time"<<endl;
269                 return;
270         }
271
272         if(shared)
273         {
274                 DebugOut(0)<<"WARNING: logging already started.  doing nothing."<<endl;
275                 return;
276         }
277
278         initDb();
279
280         thread = g_thread_new("dbthread", cbFunc, shared);
281 }
282
283 void DatabaseSink::startPlayback()
284 {
285         if(playback)
286                 return;
287
288         playback = true;
289
290         initDb();
291
292         /// populate playback queue:
293
294         vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
295
296         if(playbackShared)
297         {
298                 delete playbackShared;
299         }
300
301         playbackShared = new PlaybackShared(routingEngine, uuid(), playbackMultiplier);
302
303         for(int i=0;i<results.size();i++)
304         {
305                 if(results[i].size() < 5)
306                 {
307                         throw std::runtime_error("column mismatch in query");
308                 }
309
310                 DBObject* obj = new DBObject();
311
312                 obj->key = results[i][0];
313                 obj->value = results[i][1];
314                 obj->source = results[i][2];
315                 obj->time = boost::lexical_cast<double>(results[i][3]);
316 //              obj->sequence = boost::lexical_cast<int>(results[i][4]);
317
318                 playbackShared->playbackQueue.push_back(obj);
319         }
320
321         g_timeout_add(0,getNextEvent,playbackShared);
322 }
323
324 void DatabaseSink::initDb()
325 {
326         if(shared) delete shared;
327
328         shared = new Shared;
329         shared->db->init(databaseName, tablename, tablecreate);
330 }
331
332 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
333 {
334         if(!shared)
335                 return;
336
337         DBObject* obj = new DBObject;
338         obj->key = property;
339         obj->value = value->toString();
340         obj->source = uuid;
341         obj->time = value->timestamp;
342         obj->sequence = value->sequence;
343
344         shared->queue.append(obj);
345 }
346
347
348 std::string DatabaseSink::uuid()
349 {
350         return "9f88156e-cb92-4472-8775-9c08addf50d3";
351 }
352
353 void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
354 {
355         reply->success = false;
356
357         if(reply->property == DatabaseFileProperty)
358         {
359                 DatabaseFilePropertyType temp(databaseName);
360                 reply->value = &temp;
361
362                 reply->success = true;
363                 reply->completed(reply);
364
365                 return;
366         }
367         else if(reply->property == DatabaseLoggingProperty)
368         {
369                 BasicPropertyType<bool> temp = shared;
370
371                 reply->value = &temp;
372                 reply->success = true;
373                 reply->completed(reply);
374
375                 return;
376         }
377
378         else if(reply->property == DatabasePlaybackProperty)
379         {
380                 BasicPropertyType<bool> temp = playback;
381                 reply->value = &temp;
382                 reply->success = true;
383                 reply->completed(reply);
384
385                 return;
386         }
387
388         reply->completed(reply);
389 }
390
391 void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
392 {
393         BaseDB * db = new BaseDB();
394         db->init(databaseName, tablename, tablecreate);
395
396         ostringstream query;
397         query.precision(15);
398
399         query<<"SELECT * from "<<tablename<<" WHERE ";
400
401         if(reply->timeBegin && reply->timeEnd)
402         {
403                 query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd;
404         }
405
406         if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0)
407         {
408                 query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd;
409         }
410
411         std::vector<std::vector<string>> data = db->select(query.str());
412
413         std::list<AbstractPropertyType*> cleanup;
414
415         for(auto i=0;i<data.size();i++)
416         {
417                 if(data[i].size() != 5)
418                         continue;
419
420                 DBObject dbobj;
421                 dbobj.key = data[i][0];
422                 dbobj.value = data[i][1];
423                 dbobj.source = data[i][2];
424                 dbobj.time = boost::lexical_cast<double>(data[i][3]);
425                 dbobj.sequence = boost::lexical_cast<double>(data[i][4]);
426
427                 AbstractPropertyType* property = VehicleProperty::getPropertyTypeForPropertyNameValue(dbobj.key, dbobj.value);
428                 if(property)
429                 {
430                         property->timestamp = dbobj.time;
431                         property->sequence = dbobj.sequence;
432
433                         reply->values.push_back(property);
434                         cleanup.push_back(property);
435                 }
436         }
437
438         reply->success = true;
439         reply->completed(reply);
440
441         delete db;
442 }
443
444 AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
445 {
446         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
447         reply->success = false;
448
449         if(request.property == DatabaseLoggingProperty)
450         {
451                 if(request.value->value<bool>())
452                 {
453                         ///TODO: start or stop logging thread
454                         startDb();
455                         reply->success = true;
456                         BasicPropertyType<bool> temp(true);
457                         routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
458                 }
459                 else
460                 {
461                         stopDb();
462                         reply->success = true;
463                         BasicPropertyType<bool> temp(false);
464                         routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
465                 }
466         }
467
468         else if(request.property == DatabaseFileProperty)
469         {
470                 std::string fname = request.value->toString();
471
472                 databaseName = fname;
473
474                 StringPropertyType temp(databaseName);
475
476                 routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid());
477
478                 reply->success = true;
479         }
480         else if( request.property == DatabasePlaybackProperty)
481         {
482                 if(request.value->value<bool>())
483                 {
484                         startPlayback();
485
486                         BasicPropertyType<bool> temp(true);
487
488                         routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
489                 }
490                 else
491                 {
492                         /// TODO: stop playback
493
494                         BasicPropertyType<bool> temp(true);
495
496                         routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
497                 }
498
499                 reply->success = true;
500         }
501
502         return reply;
503 }
504
505 void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property )
506 {
507
508 }
509
510 void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property )
511 {
512 }