implemented new abstractrouting things in core
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.cpp
index a0aedbf..423e043 100644 (file)
@@ -2,7 +2,7 @@
 #include "abstractroutingengine.h"
 #include "listplusplus.h"
 
-#include <json-glib/json-glib.h>
+int bufferLength = 100;
 
 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
 {
@@ -18,6 +18,8 @@ void * cbFunc(gpointer data)
                throw std::runtime_error("Could not cast shared object.");
        }
 
+       vector<DictionaryList<string> > insertList;
+
        while(1)
        {
                DBObject* obj = shared->queue.pop();
@@ -42,10 +44,32 @@ void * cbFunc(gpointer data)
                dict.push_back(four);
                dict.push_back(five);
 
-               shared->db->insert(dict);
+               insertList.push_back(dict);
+
+               if(insertList.size() > bufferLength)
+               {
+                       shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
+                       for(int i=0; i< insertList.size(); i++)
+                       {
+                               DictionaryList<string> d = insertList[i];
+                               shared->db->insert(d);
+                       }
+                       shared->db->exec("END TRANSACTION");
+                       insertList.clear();
+               }
                delete obj;
        }
 
+       /// final flush of whatever is still in the queue:
+
+       shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
+       for(int i=0; i< insertList.size(); i++)
+       {
+               DictionaryList<string> d = insertList[i];
+               shared->db->insert(d);
+       }
+       shared->db->exec("END TRANSACTION");
+
        return NULL;
 }
 
@@ -56,6 +80,9 @@ int getNextEvent(gpointer data)
        if(!pbshared)
                throw std::runtime_error("failed to cast PlaybackShared object");
 
+       if(pbshared->stop)
+               return 0;
+
        auto itr = pbshared->playbackQueue.begin();
 
        if(itr == pbshared->playbackQueue.end())
@@ -71,7 +98,7 @@ int getNextEvent(gpointer data)
        {
                pbshared->routingEngine->updateProperty(obj->key, value, pbshared->uuid);
                value->timestamp = obj->time;
-               value->sequence = obj->sequence;
+               //value->sequence = obj->sequence;
        }
 
        if(++itr != pbshared->playbackQueue.end())
@@ -80,43 +107,66 @@ int getNextEvent(gpointer data)
                double t = o2->time - obj->time;
 
                if(t > 0)
-                       g_timeout_add(t*1000, getNextEvent, pbshared);
+                       g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
                else
-                       g_timeout_add(t, getNextEvent, pbshared);
+                       g_timeout_add(1, getNextEvent, pbshared);
        }
 
        pbshared->playbackQueue.remove(obj);
+       DebugOut()<<"playback Queue size: "<<pbshared->playbackQueue.size()<<endl;
        delete obj;
 
        return 0;
 }
 
 DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
-       :AbstractSource(engine,config),thread(NULL),shared(NULL),playback(false),playbackShared(NULL)
+       :AbstractSource(engine,config),thread(NULL),shared(NULL),playback(false),playbackShared(NULL), playbackMultiplier(1)
 {
        databaseName = "storage";
        tablename = "data";
        tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
 
-       //startDb();
+       if(config.find("databaseFile") != config.end())
+       {
+               setDatabaseFileName(config["databaseFile"]);
+       }
 
-       if(config.find("startOnLoad")!= config.end())
+       if(config.find("bufferLength") != config.end())
        {
-               startDb();
+               bufferLength = atoi(config["bufferLength"].c_str());
        }
 
-       parseConfig();
+       if(config.find("properties") != config.end())
+       {
+               parseConfig();
+       }
 
        for(auto itr=propertiesToSubscribeTo.begin();itr!=propertiesToSubscribeTo.end();itr++)
        {
                engine->subscribeToProperty(*itr,this);
        }
 
-       mSupported.push_back(DatabaseFileProperty);
-       mSupported.push_back(DatabaseLoggingProperty);
-       mSupported.push_back(DatabasePlaybackProperty);
+       mSupported.push_back(DatabaseFile);
+       mSupported.push_back(DatabaseLogging);
+       mSupported.push_back(DatabasePlayback);
+
+       routingEngine->setSupported(supported(), this);
+
+       if(config.find("startOnLoad")!= config.end())
+       {
+               setLogging(true);
+       }
+
+       if(config.find("playbackMultiplier")!= config.end())
+       {
+               playbackMultiplier = boost::lexical_cast<uint>(config["playbackMultiplier"]);
+       }
+
+       if(config.find("playbackOnLoad")!= config.end())
+       {
+               setPlayback(true);
+       }
 
-       routingEngine->setSupported(mSupported,this);
 
 }
 
@@ -133,6 +183,11 @@ DatabaseSink::~DatabaseSink()
                g_thread_unref(thread);
                delete shared;
        }
+
+       if(playbackShared)
+       {
+               delete playbackShared;
+       }
 }
 
 
@@ -146,47 +201,47 @@ PropertyList DatabaseSink::supported()
        return mSupported;
 }
 
+PropertyInfo DatabaseSink::getPropertyInfo(VehicleProperty::Property property)
+{
+       /// TODO: Compute update frequency for properties in the database
+       return PropertyInfo();
+}
+
 void DatabaseSink::parseConfig()
 {
-       JsonParser* parser = json_parser_new();
-       GError* error = nullptr;
-       if(!json_parser_load_from_data(parser, configuration["properties"].c_str(),configuration["properties"].size(), &error))
+       json_object *rootobject;
+       json_tokener *tokener = json_tokener_new();
+       enum json_tokener_error err;
+       do
+       {
+               rootobject = json_tokener_parse_ex(tokener, configuration["properties"].c_str(),configuration["properties"].size());
+       } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
+       if (err != json_tokener_success)
        {
-               DebugOut()<<"Failed to load config: "<<error->message;
-               throw std::runtime_error("Failed to load config");
+               fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
        }
-
-       JsonNode* node = json_parser_get_root(parser);
-
-       if(node == nullptr)
+       if (tokener->char_offset < configuration["properties"].size()) // XXX shouldn't access internal fields
        {
-               /// no options
-               return;
+               //Should handle the extra data here sometime...
        }
-
-       JsonReader* reader = json_reader_new(node);
-
-       if(reader == nullptr)
-               throw std::runtime_error("Unable to create JSON reader");
-
-       json_reader_read_member(reader,"properties");
-
-       g_assert(json_reader_is_array(reader));
-
-       for(int i=0; i < json_reader_count_elements(reader); i++)
+       
+       json_object *propobject = json_object_object_get(rootobject,"properties");
+       
+       g_assert(json_object_get_type(propobject) == json_type_array);
+
+       array_list *proplist = json_object_get_array(propobject);
+       
+       for(int i=0; i < array_list_length(proplist); i++)
        {
-               json_reader_read_element(reader, i);
-               std::string prop = json_reader_get_string_value(reader);
+               json_object *idxobj = (json_object*)array_list_get_idx(proplist,i);
+               std::string prop = json_object_get_string(idxobj);
                propertiesToSubscribeTo.push_back(prop);
-               json_reader_end_element(reader);
 
                DebugOut()<<"DatabaseSink logging: "<<prop<<endl;
        }
 
-       if(error) g_error_free(error);
-
-       g_object_unref(reader);
-       g_object_unref(parser);
+       json_object_put(propobject);
+       json_object_put(rootobject);
 }
 
 void DatabaseSink::stopDb()
@@ -220,7 +275,7 @@ void DatabaseSink::startDb()
 
        initDb();
 
-//     thread = g_thread_new("dbthread", cbFunc, shared);
+       thread = g_thread_new("dbthread", cbFunc, shared);
 }
 
 void DatabaseSink::startPlayback()
@@ -232,28 +287,20 @@ void DatabaseSink::startPlayback()
 
        initDb();
 
-       /// get supported:
-
-       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
-
-       for(int i=0; i < supportedStr.size(); i++)
-       {
-               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
-                       mSupported.push_back(supportedStr[i][0]);
-       }
-
-       routingEngine->setSupported(supported(), this);
-
        /// populate playback queue:
 
        vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
 
+       /// we are done with shared.  clean up:
+       delete shared;
+       shared = NULL;
+
        if(playbackShared)
        {
                delete playbackShared;
        }
 
-       playbackShared = new PlaybackShared(routingEngine,uuid());
+       playbackShared = new PlaybackShared(routingEngine, uuid(), playbackMultiplier);
 
        for(int i=0;i<results.size();i++)
        {
@@ -268,6 +315,9 @@ void DatabaseSink::startPlayback()
                obj->value = results[i][1];
                obj->source = results[i][2];
                obj->time = boost::lexical_cast<double>(results[i][3]);
+
+               /// TODO: figure out why sequence is broken:
+
 //             obj->sequence = boost::lexical_cast<int>(results[i][4]);
 
                playbackShared->playbackQueue.push_back(obj);
@@ -284,11 +334,55 @@ void DatabaseSink::initDb()
        shared->db->init(databaseName, tablename, tablecreate);
 }
 
+void DatabaseSink::setPlayback(bool v)
+{
+       AsyncSetPropertyRequest request;
+       request.property = DatabasePlayback;
+       request.value = new DatabasePlaybackType(v);
+
+       setProperty(request);
+}
+
+void DatabaseSink::setLogging(bool b)
+{
+       AsyncSetPropertyRequest request;
+       request.property = DatabaseLogging;
+       request.value = new DatabaseLoggingType(b);
+
+       setProperty(request);
+}
+
+void DatabaseSink::setDatabaseFileName(string filename)
+{
+       databaseName = filename;
+
+       initDb();
+
+       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
+
+       for(int i=0; i < supportedStr.size(); i++)
+       {
+               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
+                       mSupported.push_back(supportedStr[i][0]);
+       }
+
+       delete shared;
+       shared = NULL;
+
+       routingEngine->setSupported(mSupported, this);
+}
+
 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
 {
        if(!shared)
                return;
 
+       if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(property))
+       {
+               mSupported.push_back(property);
+               routingEngine->setSupported(mSupported, this);
+       }
+
        DBObject* obj = new DBObject;
        obj->key = property;
        obj->value = value->toString();
@@ -309,9 +403,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 {
        reply->success = false;
 
-       if(reply->property == DatabaseFileProperty)
+       if(reply->property == DatabaseFile)
        {
-               StringPropertyType temp(databaseName);
+               DatabaseFileType temp(databaseName);
                reply->value = &temp;
 
                reply->success = true;
@@ -319,9 +413,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 
                return;
        }
-       else if(reply->property == DatabaseLoggingProperty)
+       else if(reply->property == DatabaseLogging)
        {
-               BasicPropertyType<bool> temp = shared;
+               DatabaseLoggingType temp = shared;
 
                reply->value = &temp;
                reply->success = true;
@@ -330,9 +424,9 @@ void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
                return;
        }
 
-       else if(reply->property == DatabasePlaybackProperty)
+       else if(reply->property == DatabasePlayback)
        {
-               BasicPropertyType<bool> temp = playback;
+               DatabasePlaybackType temp = playback;
                reply->value = &temp;
                reply->success = true;
                reply->completed(reply);
@@ -351,7 +445,7 @@ void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        ostringstream query;
        query.precision(15);
 
-       query<<"SELECT * from "<<tablename<<" WHERE ";
+       query<<"SELECT * from "<<tablename<<" WHERE key='"<<reply->property<<"' AND";
 
        if(reply->timeBegin && reply->timeEnd)
        {
@@ -393,12 +487,6 @@ void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        reply->success = true;
        reply->completed(reply);
 
-       /// reply is owned by the requester of this call.  we own the data:
-       for(auto itr = cleanup.begin(); itr != cleanup.end(); itr++)
-       {
-               delete *itr;
-       }
-
        delete db;
 }
 
@@ -407,54 +495,58 @@ AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
        AsyncPropertyReply* reply = new AsyncPropertyReply(request);
        reply->success = false;
 
-       if(request.property == DatabaseLoggingProperty)
+       if(request.property == DatabaseLogging)
        {
                if(request.value->value<bool>())
                {
-                       ///TODO: start or stop logging thread
+                       setPlayback(false);
                        startDb();
                        reply->success = true;
-                       BasicPropertyType<bool> temp(true);
-                       routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
+                       DatabaseLoggingType temp(true);
+                       routingEngine->updateProperty(DatabaseLogging,&temp,uuid());
                }
                else
                {
                        stopDb();
                        reply->success = true;
-                       BasicPropertyType<bool> temp(false);
-                       routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid());
+                       DatabaseLoggingType temp(false);
+                       routingEngine->updateProperty(DatabaseLogging,&temp,uuid());
                }
        }
 
-       else if(request.property == DatabaseFileProperty)
+       else if(request.property == DatabaseFile)
        {
                std::string fname = request.value->toString();
 
                databaseName = fname;
 
-               StringPropertyType temp(databaseName);
+               DatabaseFileType temp(databaseName);
 
-               routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid());
+               routingEngine->updateProperty(DatabaseFile,&temp,uuid());
 
                reply->success = true;
        }
-       else if( request.property == DatabasePlaybackProperty)
+       else if( request.property == DatabasePlayback)
        {
                if(request.value->value<bool>())
                {
+                       setLogging(false);
                        startPlayback();
 
-                       BasicPropertyType<bool> temp(true);
+                       DatabasePlaybackType temp(playback);
 
-                       routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
+                       routingEngine->updateProperty(DatabasePlayback,&temp,uuid());
                }
                else
                {
-                       /// TODO: stop playback
+                       if(playbackShared)
+                               playbackShared->stop = true;
+
+                       playback = false;
 
-                       BasicPropertyType<bool> temp(true);
+                       DatabasePlaybackType temp(playback);
 
-                       routingEngine->updateProperty(DatabasePlaybackProperty,&temp,uuid());
+                       routingEngine->updateProperty(DatabasePlayback, &temp, uuid());
                }
 
                reply->success = true;