implemented new abstractrouting things in core
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
index a762fb3..423e043 100644 (file)
@@ -2,6 +2,8 @@
 #include "abstractroutingengine.h"
 #include "listplusplus.h"
 
+int bufferLength = 100;
+
 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
 {
        return new DatabaseSinkManager(routingengine, config);
@@ -44,7 +46,7 @@ void * cbFunc(gpointer data)
 
                insertList.push_back(dict);
 
-               if(insertList.size() > 100)
+               if(insertList.size() > bufferLength)
                {
                        shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
                        for(int i=0; i< insertList.size(); i++)
@@ -58,6 +60,16 @@ void * cbFunc(gpointer data)
                delete obj;
        }
 
+       /// final flush of whatever is still in the queue:
+
+       shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
+       for(int i=0; i< insertList.size(); i++)
+       {
+               DictionaryList<string> d = insertList[i];
+               shared->db->insert(d);
+       }
+       shared->db->exec("END TRANSACTION");
+
        return NULL;
 }
 
@@ -68,6 +80,9 @@ int getNextEvent(gpointer data)
        if(!pbshared)
                throw std::runtime_error("failed to cast PlaybackShared object");
 
+       if(pbshared->stop)
+               return 0;
+
        auto itr = pbshared->playbackQueue.begin();
 
        if(itr == pbshared->playbackQueue.end())
@@ -94,10 +109,11 @@ int getNextEvent(gpointer data)
                if(t > 0)
                        g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
                else
-                       g_timeout_add(t, getNextEvent, pbshared);
+                       g_timeout_add(1, getNextEvent, pbshared);
        }
 
        pbshared->playbackQueue.remove(obj);
+       DebugOut()<<"playback Queue size: "<<pbshared->playbackQueue.size()<<endl;
        delete obj;
 
        return 0;
@@ -112,7 +128,12 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::
 
        if(config.find("databaseFile") != config.end())
        {
-               databaseName = config["databaseFile"];
+               setDatabaseFileName(config["databaseFile"]);
+       }
+
+       if(config.find("bufferLength") != config.end())
+       {
+               bufferLength = atoi(config["bufferLength"].c_str());
        }
 
        if(config.find("properties") != config.end())
@@ -125,32 +146,15 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::
                engine->subscribeToProperty(*itr,this);
        }
 
-       mSupported.push_back(DatabaseFileProperty);
-       mSupported.push_back(DatabaseLoggingProperty);
-       mSupported.push_back(DatabasePlaybackProperty);
-
-
-       initDb();
-
-       /// get supported:
-
-       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
-
-       for(int i=0; i < supportedStr.size(); i++)
-       {
-               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
-                       mSupported.push_back(supportedStr[i][0]);
-       }
+       mSupported.push_back(DatabaseFile);
+       mSupported.push_back(DatabaseLogging);
+       mSupported.push_back(DatabasePlayback);
 
        routingEngine->setSupported(supported(), this);
 
        if(config.find("startOnLoad")!= config.end())
        {
-               AsyncSetPropertyRequest request;
-               request.property = DatabaseLoggingProperty;
-               request.value = new BasicPropertyType<bool>(true);
-
-               setProperty(request);
+               setLogging(true);
        }
 
        if(config.find("playbackMultiplier")!= config.end())
@@ -160,7 +164,7 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::
 
        if(config.find("playbackOnLoad")!= config.end())
        {
-               startPlayback();
+               setPlayback(true);
        }
 
 
@@ -197,6 +201,12 @@ PropertyList DatabaseSink::supported()
        return mSupported;
 }
 
+PropertyInfo DatabaseSink::getPropertyInfo(VehicleProperty::Property property)
+{
+       /// TODO: Compute update frequency for properties in the database
+       return PropertyInfo();
+}
+
 void DatabaseSink::parseConfig()
 {
        json_object *rootobject;
@@ -281,6 +291,10 @@ void DatabaseSink::startPlayback()
 
        vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
 
+       /// we are done with shared.  clean up:
+       delete shared;
+       shared = NULL;
+
        if(playbackShared)
        {
                delete playbackShared;
@@ -301,6 +315,9 @@ void DatabaseSink::startPlayback()
                obj->value = results[i][1];
                obj->source = results[i][2];
                obj->time = boost::lexical_cast<double>(results[i][3]);
+
+               /// TODO: figure out why sequence is broken:
+
 //             obj->sequence = boost::lexical_cast<int>(results[i][4]);
 
                playbackShared->playbackQueue.push_back(obj);
@@ -317,11 +334,55 @@ void DatabaseSink::initDb()
        shared->db->init(databaseName, tablename, tablecreate);
 }
 
+void DatabaseSink::setPlayback(bool v)
+{
+       AsyncSetPropertyRequest request;
+       request.property = DatabasePlayback;
+       request.value = new DatabasePlaybackType(v);
+
+       setProperty(request);
+}
+
+void DatabaseSink::setLogging(bool b)
+{
+       AsyncSetPropertyRequest request;
+       request.property = DatabaseLogging;
+       request.value = new DatabaseLoggingType(b);
+
+       setProperty(request);
+}
+
+void DatabaseSink::setDatabaseFileName(string filename)
+{
+       databaseName = filename;
+
+       initDb();
+
+       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
+
+       for(int i=0; i < supportedStr.size(); i++)
+       {
+               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
+                       mSupported.push_back(supportedStr[i][0]);
+       }
+
+       delete shared;
+       shared = NULL;
+
+       routingEngine->setSupported(mSupported, this);
+}
+
 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
 {
        if(!shared)
                return;
 
+       if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(property))
+       {
+               mSupported.push_back(property);
+               routingEngine->setSupported(mSupported, this);
+       }
+
        DBObject* obj = new DBObject;
        obj->key = property;
        obj->value = value->toString();
@@ -342,9 +403,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 {
        reply->success = false;
 
-       if(reply->property == DatabaseFileProperty)
+       if(reply->property == DatabaseFile)
        {
-               StringPropertyType temp(databaseName);
+               DatabaseFileType temp(databaseName);
                reply->value = &temp;
 
                reply->success = true;
@@ -352,9 +413,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 
                return;
        }
-       else if(reply->property == DatabaseLoggingProperty)
+       else if(reply->property == DatabaseLogging)
        {
-               BasicPropertyType<bool> temp = shared;
+               DatabaseLoggingType temp = shared;
 
                reply->value = &temp;
                reply->success = true;
@@ -363,9 +424,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
                return;
        }
 
-       else if(reply->property == DatabasePlaybackProperty)
+       else if(reply->property == DatabasePlayback)
        {
-               BasicPropertyType<bool> temp = playback;
+               DatabasePlaybackType temp = playback;
                reply->value = &temp;
                reply->success = true;
                reply->completed(reply);
@@ -384,7 +445,7 @@ void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        ostringstream query;
        query.precision(15);
 
-       query<<"SELECT * from "<<tablename<<" WHERE ";
+       query<<"SELECT * from "<<tablename<<" WHERE key='"<<reply->property<<"' AND";
 
        if(reply->timeBegin && reply->timeEnd)
        {
@@ -434,54 +495,58 @@ AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
        AsyncPropertyReply* reply = new AsyncPropertyReply(request);
        reply->success = false;
 
-       if(request.property == DatabaseLoggingProperty)
+       if(request.property == DatabaseLogging)
        {
                if(request.value->value<bool>())
                {
-                       ///TODO: start or stop logging thread
+                       setPlayback(false);
                        startDb();
                        reply->success = true;
-                       BasicPropertyType<bool> temp(true);
-                       routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
+                       DatabaseLoggingType temp(true);
+                       routingEngine->updateProperty(DatabaseLogging,&temp,uuid());
                }
                else
                {
                        stopDb();
                        reply->success = true;
-                       BasicPropertyType<bool> temp(false);
-                       routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
+                       DatabaseLoggingType temp(false);
+                       routingEngine->updateProperty(DatabaseLogging,&temp,uuid());
                }
        }
 
-       else if(request.property == DatabaseFileProperty)
+       else if(request.property == DatabaseFile)
        {
                std::string fname = request.value->toString();
 
                databaseName = fname;
 
-               StringPropertyType temp(databaseName);
+               DatabaseFileType temp(databaseName);
 
-               routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid());
+               routingEngine->updateProperty(DatabaseFile,&temp,uuid());
 
                reply->success = true;
        }
-       else if( request.property == DatabasePlaybackProperty)
+       else if( request.property == DatabasePlayback)
        {
                if(request.value->value<bool>())
                {
+                       setLogging(false);
                        startPlayback();
 
-                       BasicPropertyType<bool> temp(true);
+                       DatabasePlaybackType temp(playback);
 
-                       routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
+                       routingEngine->updateProperty(DatabasePlayback,&temp,uuid());
                }
                else
                {
-                       /// TODO: stop playback
+                       if(playbackShared)
+                               playbackShared->stop = true;
+
+                       playback = false;
 
-                       BasicPropertyType<bool> temp(true);
+                       DatabasePlaybackType temp(playback);
 
-                       routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
+                       routingEngine->updateProperty(DatabasePlayback, &temp, uuid());
                }
 
                reply->success = true;