fixed merge
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
1 #include "databasesink.h"
2 #include "abstractroutingengine.h"
3 #include "listplusplus.h"
4
5 #include <json-glib/json-glib.h>
6
7 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
8 {
9         return new DatabaseSinkManager(routingengine, config);
10 }
11
12 void * cbFunc(gpointer data)
13 {
14         Shared *shared = static_cast<Shared*>(data);
15
16         if(!shared)
17         {
18                 throw std::runtime_error("Could not cast shared object.");
19         }
20
21         while(1)
22         {
23                 DBObject* obj = shared->queue.pop();
24
25                 if( obj->quit )
26                 {
27                         delete obj;
28                         break;
29                 }
30
31                 DictionaryList<string> dict;
32
33                 NameValuePair<string> one("key", obj->key);
34                 NameValuePair<string> two("value", obj->value);
35                 NameValuePair<string> three("source", obj->source);
36                 NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time));
37                 NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence));
38
39                 dict.push_back(one);
40                 dict.push_back(two);
41                 dict.push_back(three);
42                 dict.push_back(four);
43                 dict.push_back(five);
44
45                 shared->db->insert(dict);
46                 delete obj;
47         }
48
49         return NULL;
50 }
51
52 int getNextEvent(gpointer data)
53 {
54         PlaybackShared* pbshared = static_cast<PlaybackShared*>(data);
55
56         if(!pbshared)
57                 throw std::runtime_error("failed to cast PlaybackShared object");
58
59         auto itr = pbshared->playbackQueue.begin();
60
61         if(itr == pbshared->playbackQueue.end())
62         {
63                 return 0;
64         }
65
66         DBObject* obj = *itr;
67
68         AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj->key,obj->value);
69
70         if(value)
71         {
72                 pbshared->routingEngine->updateProperty(obj->key, value, pbshared->uuid);
73                 value->timestamp = obj->time;
74                 value->sequence = obj->sequence;
75         }
76
77         if(++itr != pbshared->playbackQueue.end())
78         {
79                 DBObject *o2 = *itr;
80                 double t = o2->time - obj->time;
81
82                 if(t > 0)
83                         g_timeout_add(t*1000, getNextEvent, pbshared);
84                 else
85                         g_timeout_add(t, getNextEvent, pbshared);
86         }
87
88         pbshared->playbackQueue.remove(obj);
89         delete obj;
90
91         return 0;
92 }
93
94 DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
95         :AbstractSource(engine,config),thread(NULL),shared(NULL),playback(false),playbackShared(NULL)
96 {
97         databaseName = "storage";
98         tablename = "data";
99         tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
100
101         //startDb();
102
103         if(config.find("startOnLoad")!= config.end())
104         {
105                 startDb();
106         }
107
108         parseConfig();
109
110         for(auto itr=propertiesToSubscribeTo.begin();itr!=propertiesToSubscribeTo.end();itr++)
111         {
112                 engine->subscribeToProperty(*itr,this);
113         }
114
115         mSupported.push_back(DatabaseFileProperty);
116         mSupported.push_back(DatabaseLoggingProperty);
117         mSupported.push_back(DatabasePlaybackProperty);
118
119         routingEngine->setSupported(mSupported,this);
120
121 }
122
123 DatabaseSink::~DatabaseSink()
124 {
125         if(shared)
126         {
127                 DBObject* obj = new DBObject();
128                 obj->quit = true;
129
130                 shared->queue.append(obj);
131
132                 g_thread_join(thread);
133                 g_thread_unref(thread);
134                 delete shared;
135         }
136 }
137
138
139 void DatabaseSink::supportedChanged(PropertyList supportedProperties)
140 {
141
142 }
143
144 PropertyList DatabaseSink::supported()
145 {
146         return mSupported;
147 }
148
149 void DatabaseSink::parseConfig()
150 {
151         JsonParser* parser = json_parser_new();
152         GError* error = nullptr;
153         if(!json_parser_load_from_data(parser, configuration["properties"].c_str(),configuration["properties"].size(), &error))
154         {
155                 DebugOut()<<"Failed to load config: "<<error->message;
156                 throw std::runtime_error("Failed to load config");
157         }
158
159         JsonNode* node = json_parser_get_root(parser);
160
161         if(node == nullptr)
162         {
163                 /// no options
164                 return;
165         }
166
167         JsonReader* reader = json_reader_new(node);
168
169         if(reader == nullptr)
170                 throw std::runtime_error("Unable to create JSON reader");
171
172         json_reader_read_member(reader,"properties");
173
174         g_assert(json_reader_is_array(reader));
175
176         for(int i=0; i < json_reader_count_elements(reader); i++)
177         {
178                 json_reader_read_element(reader, i);
179                 std::string prop = json_reader_get_string_value(reader);
180                 propertiesToSubscribeTo.push_back(prop);
181                 json_reader_end_element(reader);
182
183                 DebugOut()<<"DatabaseSink logging: "<<prop<<endl;
184         }
185
186         if(error) g_error_free(error);
187
188         g_object_unref(reader);
189         g_object_unref(parser);
190 }
191
192 void DatabaseSink::stopDb()
193 {
194         if(!shared)
195                 return;
196
197         DBObject *obj = new DBObject();
198         obj->quit = true;
199         shared->queue.append(obj);
200
201         g_thread_join(thread);
202
203         delete shared;
204         shared = NULL;
205 }
206
207 void DatabaseSink::startDb()
208 {
209         if(playback)
210         {
211                 DebugOut(0)<<"ERROR: tried to start logging during playback.  Only logging or playback can be used at one time"<<endl;
212                 return;
213         }
214
215         if(shared)
216         {
217                 DebugOut(0)<<"WARNING: logging already started.  doing nothing."<<endl;
218                 return;
219         }
220
221         initDb();
222
223 //      thread = g_thread_new("dbthread", cbFunc, shared);
224 }
225
226 void DatabaseSink::startPlayback()
227 {
228         if(playback)
229                 return;
230
231         playback = true;
232
233         initDb();
234
235         /// get supported:
236
237         vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
238
239         for(int i=0; i < supportedStr.size(); i++)
240         {
241                 if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
242                         mSupported.push_back(supportedStr[i][0]);
243         }
244
245         routingEngine->setSupported(supported(), this);
246
247         /// populate playback queue:
248
249         vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
250
251         if(playbackShared)
252         {
253                 delete playbackShared;
254         }
255
256         playbackShared = new PlaybackShared(routingEngine,uuid());
257
258         for(int i=0;i<results.size();i++)
259         {
260                 if(results[i].size() < 5)
261                 {
262                         throw std::runtime_error("column mismatch in query");
263                 }
264
265                 DBObject* obj = new DBObject();
266
267                 obj->key = results[i][0];
268                 obj->value = results[i][1];
269                 obj->source = results[i][2];
270                 obj->time = boost::lexical_cast<double>(results[i][3]);
271 //              obj->sequence = boost::lexical_cast<int>(results[i][4]);
272
273                 playbackShared->playbackQueue.push_back(obj);
274         }
275
276         g_timeout_add(0,getNextEvent,playbackShared);
277 }
278
279 void DatabaseSink::initDb()
280 {
281         if(shared) delete shared;
282
283         shared = new Shared;
284         shared->db->init(databaseName, tablename, tablecreate);
285 }
286
287 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
288 {
289         if(!shared)
290                 return;
291
292         DBObject* obj = new DBObject;
293         obj->key = property;
294         obj->value = value->toString();
295         obj->source = uuid;
296         obj->time = value->timestamp;
297         obj->sequence = value->sequence;
298
299         shared->queue.append(obj);
300 }
301
302
303 std::string DatabaseSink::uuid()
304 {
305         return "9f88156e-cb92-4472-8775-9c08addf50d3";
306 }
307
308 void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
309 {
310         reply->success = false;
311
312         if(reply->property == DatabaseFileProperty)
313         {
314                 StringPropertyType temp(databaseName);
315                 reply->value = &temp;
316
317                 reply->success = true;
318                 reply->completed(reply);
319
320                 return;
321         }
322         else if(reply->property == DatabaseLoggingProperty)
323         {
324                 BasicPropertyType<bool> temp = shared;
325
326                 reply->value = &temp;
327                 reply->success = true;
328                 reply->completed(reply);
329
330                 return;
331         }
332
333         else if(reply->property == DatabasePlaybackProperty)
334         {
335                 BasicPropertyType<bool> temp = playback;
336                 reply->value = &temp;
337                 reply->success = true;
338                 reply->completed(reply);
339
340                 return;
341         }
342
343         reply->completed(reply);
344 }
345
346 void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
347 {
348         BaseDB * db = new BaseDB();
349         db->init(databaseName, tablename, tablecreate);
350
351         ostringstream query;
352         query.precision(15);
353
354         query<<"SELECT * from "<<tablename<<" WHERE ";
355
356         if(reply->timeBegin && reply->timeEnd)
357         {
358                 query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd;
359         }
360
361         if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0)
362         {
363                 query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd;
364         }
365
366         std::vector<std::vector<string>> data = db->select(query.str());
367
368         std::list<AbstractPropertyType*> cleanup;
369
370         for(auto i=0;i<data.size();i++)
371         {
372                 if(data[i].size() != 5)
373                         continue;
374
375                 DBObject dbobj;
376                 dbobj.key = data[i][0];
377                 dbobj.value = data[i][1];
378                 dbobj.source = data[i][2];
379                 dbobj.time = boost::lexical_cast<double>(data[i][3]);
380                 dbobj.sequence = boost::lexical_cast<double>(data[i][4]);
381
382                 AbstractPropertyType* property = VehicleProperty::getPropertyTypeForPropertyNameValue(dbobj.key, dbobj.value);
383                 if(property)
384                 {
385                         property->timestamp = dbobj.time;
386                         property->sequence = dbobj.sequence;
387
388                         reply->values.push_back(property);
389                         cleanup.push_back(property);
390                 }
391         }
392
393         reply->success = true;
394         reply->completed(reply);
395
396         /// reply is owned by the requester of this call.  we own the data:
397         for(auto itr = cleanup.begin(); itr != cleanup.end(); itr++)
398         {
399                 delete *itr;
400         }
401
402         delete db;
403 }
404
405 AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
406 {
407         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
408         reply->success = false;
409
410         if(request.property == DatabaseLoggingProperty)
411         {
412                 if(request.value->value<bool>())
413                 {
414                         ///TODO: start or stop logging thread
415                         startDb();
416                         reply->success = true;
417                         BasicPropertyType<bool> temp(true);
418                         routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
419                 }
420                 else
421                 {
422                         stopDb();
423                         reply->success = true;
424                         BasicPropertyType<bool> temp(false);
425                         routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
426                 }
427         }
428
429         else if(request.property == DatabaseFileProperty)
430         {
431                 std::string fname = request.value->toString();
432
433                 databaseName = fname;
434
435                 StringPropertyType temp(databaseName);
436
437                 routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid());
438
439                 reply->success = true;
440         }
441         else if( request.property == DatabasePlaybackProperty)
442         {
443                 if(request.value->value<bool>())
444                 {
445                         startPlayback();
446                 }
447                 else
448                 {
449                         /// TODO: stop playback
450                 }
451
452                 reply->success = true;
453         }
454
455         return reply;
456 }
457
458 void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property )
459 {
460
461 }
462
463 void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property )
464 {
465 }