implemented new abstractrouting things in core
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
index 63af05c..423e043 100644 (file)
@@ -2,6 +2,8 @@
 #include "abstractroutingengine.h"
 #include "listplusplus.h"
 
+int bufferLength = 100;
+
 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
 {
        return new DatabaseSinkManager(routingengine, config);
@@ -16,6 +18,8 @@ void * cbFunc(gpointer data)
                throw std::runtime_error("Could not cast shared object.");
        }
 
+       vector<DictionaryList<string> > insertList;
+
        while(1)
        {
                DBObject* obj = shared->queue.pop();
@@ -40,10 +44,32 @@ void * cbFunc(gpointer data)
                dict.push_back(four);
                dict.push_back(five);
 
-               shared->db->insert(dict);
+               insertList.push_back(dict);
+
+               if(insertList.size() > bufferLength)
+               {
+                       shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
+                       for(int i=0; i< insertList.size(); i++)
+                       {
+                               DictionaryList<string> d = insertList[i];
+                               shared->db->insert(d);
+                       }
+                       shared->db->exec("END TRANSACTION");
+                       insertList.clear();
+               }
                delete obj;
        }
 
+       /// final flush of whatever is still in the queue:
+
+       shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
+       for(int i=0; i< insertList.size(); i++)
+       {
+               DictionaryList<string> d = insertList[i];
+               shared->db->insert(d);
+       }
+       shared->db->exec("END TRANSACTION");
+
        return NULL;
 }
 
@@ -54,6 +80,9 @@ int getNextEvent(gpointer data)
        if(!pbshared)
                throw std::runtime_error("failed to cast PlaybackShared object");
 
+       if(pbshared->stop)
+               return 0;
+
        auto itr = pbshared->playbackQueue.begin();
 
        if(itr == pbshared->playbackQueue.end())
@@ -69,7 +98,7 @@ int getNextEvent(gpointer data)
        {
                pbshared->routingEngine->updateProperty(obj->key, value, pbshared->uuid);
                value->timestamp = obj->time;
-               value->sequence = obj->sequence;
+               //value->sequence = obj->sequence;
        }
 
        if(++itr != pbshared->playbackQueue.end())
@@ -80,10 +109,11 @@ int getNextEvent(gpointer data)
                if(t > 0)
                        g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
                else
-                       g_timeout_add(t, getNextEvent, pbshared);
+                       g_timeout_add(1, getNextEvent, pbshared);
        }
 
        pbshared->playbackQueue.remove(obj);
+       DebugOut()<<"playback Queue size: "<<pbshared->playbackQueue.size()<<endl;
        delete obj;
 
        return 0;
@@ -96,40 +126,47 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::
        tablename = "data";
        tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
 
-       //startDb();
-
-       if(config.find("startOnLoad")!= config.end())
+       if(config.find("databaseFile") != config.end())
        {
-               startDb();
+               setDatabaseFileName(config["databaseFile"]);
        }
 
-       if(config.find("playbackMultiplier")!= config.end())
+       if(config.find("bufferLength") != config.end())
        {
-               playbackMultiplier = boost::lexical_cast<uint>(config["playbackMultiplier"]);
+               bufferLength = atoi(config["bufferLength"].c_str());
        }
 
-       if(config.find("playbackOnLoad")!= config.end())
+       if(config.find("properties") != config.end())
        {
-               startPlayback();
+               parseConfig();
        }
 
-       if(config.find("databaseFile") != config.end())
+       for(auto itr=propertiesToSubscribeTo.begin();itr!=propertiesToSubscribeTo.end();itr++)
        {
-               databaseName = config["databaseFile"];
+               engine->subscribeToProperty(*itr,this);
        }
 
-       parseConfig();
+       mSupported.push_back(DatabaseFile);
+       mSupported.push_back(DatabaseLogging);
+       mSupported.push_back(DatabasePlayback);
 
-       for(auto itr=propertiesToSubscribeTo.begin();itr!=propertiesToSubscribeTo.end();itr++)
+       routingEngine->setSupported(supported(), this);
+
+       if(config.find("startOnLoad")!= config.end())
        {
-               engine->subscribeToProperty(*itr,this);
+               setLogging(true);
        }
 
-       mSupported.push_back(DatabaseFileProperty);
-       mSupported.push_back(DatabaseLoggingProperty);
-       mSupported.push_back(DatabasePlaybackProperty);
+       if(config.find("playbackMultiplier")!= config.end())
+       {
+               playbackMultiplier = boost::lexical_cast<uint>(config["playbackMultiplier"]);
+       }
+
+       if(config.find("playbackOnLoad")!= config.end())
+       {
+               setPlayback(true);
+       }
 
-       routingEngine->setSupported(mSupported,this);
 
 }
 
@@ -164,6 +201,12 @@ PropertyList DatabaseSink::supported()
        return mSupported;
 }
 
+PropertyInfo DatabaseSink::getPropertyInfo(VehicleProperty::Property property)
+{
+       /// TODO: Compute update frequency for properties in the database
+       return PropertyInfo();
+}
+
 void DatabaseSink::parseConfig()
 {
        json_object *rootobject;
@@ -232,7 +275,7 @@ void DatabaseSink::startDb()
 
        initDb();
 
-//     thread = g_thread_new("dbthread", cbFunc, shared);
+       thread = g_thread_new("dbthread", cbFunc, shared);
 }
 
 void DatabaseSink::startPlayback()
@@ -244,22 +287,14 @@ void DatabaseSink::startPlayback()
 
        initDb();
 
-       /// get supported:
-
-       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
-
-       for(int i=0; i < supportedStr.size(); i++)
-       {
-               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
-                       mSupported.push_back(supportedStr[i][0]);
-       }
-
-       routingEngine->setSupported(supported(), this);
-
        /// populate playback queue:
 
        vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
 
+       /// we are done with shared.  clean up:
+       delete shared;
+       shared = NULL;
+
        if(playbackShared)
        {
                delete playbackShared;
@@ -280,6 +315,9 @@ void DatabaseSink::startPlayback()
                obj->value = results[i][1];
                obj->source = results[i][2];
                obj->time = boost::lexical_cast<double>(results[i][3]);
+
+               /// TODO: figure out why sequence is broken:
+
 //             obj->sequence = boost::lexical_cast<int>(results[i][4]);
 
                playbackShared->playbackQueue.push_back(obj);
@@ -296,11 +334,55 @@ void DatabaseSink::initDb()
        shared->db->init(databaseName, tablename, tablecreate);
 }
 
+void DatabaseSink::setPlayback(bool v)
+{
+       AsyncSetPropertyRequest request;
+       request.property = DatabasePlayback;
+       request.value = new DatabasePlaybackType(v);
+
+       setProperty(request);
+}
+
+void DatabaseSink::setLogging(bool b)
+{
+       AsyncSetPropertyRequest request;
+       request.property = DatabaseLogging;
+       request.value = new DatabaseLoggingType(b);
+
+       setProperty(request);
+}
+
+void DatabaseSink::setDatabaseFileName(string filename)
+{
+       databaseName = filename;
+
+       initDb();
+
+       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
+
+       for(int i=0; i < supportedStr.size(); i++)
+       {
+               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
+                       mSupported.push_back(supportedStr[i][0]);
+       }
+
+       delete shared;
+       shared = NULL;
+
+       routingEngine->setSupported(mSupported, this);
+}
+
 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
 {
        if(!shared)
                return;
 
+       if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(property))
+       {
+               mSupported.push_back(property);
+               routingEngine->setSupported(mSupported, this);
+       }
+
        DBObject* obj = new DBObject;
        obj->key = property;
        obj->value = value->toString();
@@ -321,9 +403,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 {
        reply->success = false;
 
-       if(reply->property == DatabaseFileProperty)
+       if(reply->property == DatabaseFile)
        {
-               StringPropertyType temp(databaseName);
+               DatabaseFileType temp(databaseName);
                reply->value = &temp;
 
                reply->success = true;
@@ -331,9 +413,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 
                return;
        }
-       else if(reply->property == DatabaseLoggingProperty)
+       else if(reply->property == DatabaseLogging)
        {
-               BasicPropertyType<bool> temp = shared;
+               DatabaseLoggingType temp = shared;
 
                reply->value = &temp;
                reply->success = true;
@@ -342,9 +424,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
                return;
        }
 
-       else if(reply->property == DatabasePlaybackProperty)
+       else if(reply->property == DatabasePlayback)
        {
-               BasicPropertyType<bool> temp = playback;
+               DatabasePlaybackType temp = playback;
                reply->value = &temp;
                reply->success = true;
                reply->completed(reply);
@@ -363,7 +445,7 @@ void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        ostringstream query;
        query.precision(15);
 
-       query<<"SELECT * from "<<tablename<<" WHERE ";
+       query<<"SELECT * from "<<tablename<<" WHERE key='"<<reply->property<<"' AND";
 
        if(reply->timeBegin && reply->timeEnd)
        {
@@ -405,12 +487,6 @@ void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        reply->success = true;
        reply->completed(reply);
 
-       /// reply is owned by the requester of this call.  we own the data:
-       for(auto itr = cleanup.begin(); itr != cleanup.end(); itr++)
-       {
-               delete *itr;
-       }
-
        delete db;
 }
 
@@ -419,54 +495,58 @@ AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
        AsyncPropertyReply* reply = new AsyncPropertyReply(request);
        reply->success = false;
 
-       if(request.property == DatabaseLoggingProperty)
+       if(request.property == DatabaseLogging)
        {
                if(request.value->value<bool>())
                {
-                       ///TODO: start or stop logging thread
+                       setPlayback(false);
                        startDb();
                        reply->success = true;
-                       BasicPropertyType<bool> temp(true);
-                       routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
+                       DatabaseLoggingType temp(true);
+                       routingEngine->updateProperty(DatabaseLogging,&temp,uuid());
                }
                else
                {
                        stopDb();
                        reply->success = true;
-                       BasicPropertyType<bool> temp(false);
-                       routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
+                       DatabaseLoggingType temp(false);
+                       routingEngine->updateProperty(DatabaseLogging,&temp,uuid());
                }
        }
 
-       else if(request.property == DatabaseFileProperty)
+       else if(request.property == DatabaseFile)
        {
                std::string fname = request.value->toString();
 
                databaseName = fname;
 
-               StringPropertyType temp(databaseName);
+               DatabaseFileType temp(databaseName);
 
-               routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid());
+               routingEngine->updateProperty(DatabaseFile,&temp,uuid());
 
                reply->success = true;
        }
-       else if( request.property == DatabasePlaybackProperty)
+       else if( request.property == DatabasePlayback)
        {
                if(request.value->value<bool>())
                {
+                       setLogging(false);
                        startPlayback();
 
-                       BasicPropertyType<bool> temp(true);
+                       DatabasePlaybackType temp(playback);
 
-                       routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
+                       routingEngine->updateProperty(DatabasePlayback,&temp,uuid());
                }
                else
                {
-                       /// TODO: stop playback
+                       if(playbackShared)
+                               playbackShared->stop = true;
+
+                       playback = false;
 
-                       BasicPropertyType<bool> temp(true);
+                       DatabasePlaybackType temp(playback);
 
-                       routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
+                       routingEngine->updateProperty(DatabasePlayback, &temp, uuid());
                }
 
                reply->success = true;