big database plugin update
authorKevron Rees <tripzero.kev@gmail.com>
Thu, 14 Mar 2013 18:54:22 +0000 (11:54 -0700)
committerKevron Rees <tripzero.kev@gmail.com>
Thu, 14 Mar 2013 18:54:22 +0000 (11:54 -0700)
examples/databaseconfig
examples/storage
plugins/database/databasesink.cpp
plugins/database/databasesink.h

index 9007fe5..23cde22 100644 (file)
@@ -14,6 +14,9 @@
                {
                        "name" : "Example Sink",
                        "path" : "/usr/lib/automotive-message-broker/examplesinkplugin.so"
+               },
+               {
+                       "path" : "/usr/lib/automotive-message-broker/dbussinkplugin.so"
                }
        ]
 }
index 19a06f0..c21b03b 100644 (file)
Binary files a/examples/storage and b/examples/storage differ
index 85c751d..50c97b3 100644 (file)
@@ -1,5 +1,6 @@
 #include "databasesink.h"
 #include "abstractroutingengine.h"
+#include "listplusplus.h"
 
 #include <json-glib/json-glib.h>
 
@@ -8,52 +9,99 @@ extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, ma
        return new DatabaseSinkManager(routingengine, config);
 }
 
-DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
-       :AbstractSource(engine,config)
+void * cbFunc(gpointer data)
 {
-       databaseName = "storage";
-       tablename = "data";
-       tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
-       shared = new Shared;
-       shared->db->init(databaseName, tablename, tablecreate);
+       Shared *shared = static_cast<Shared*>(data);
+
+       if(!shared)
+       {
+               throw std::runtime_error("Could not cast shared object.");
+       }
 
-       auto cb = [](gpointer data)
+       while(1)
        {
-               Shared *shared = (Shared*)data;
+               DBObject* obj = shared->queue.pop();
 
-               while(1)
+               if( obj->quit )
                {
-                       DBObject* obj = shared->queue.pop();
+                       delete obj;
+                       break;
+               }
 
-                       if( obj->quit )
-                       {
-                               delete obj;
-                               break;
-                       }
+               DictionaryList<string> dict;
 
-                       DictionaryList<string> dict;
+               NameValuePair<string> one("key", obj->key);
+               NameValuePair<string> two("value", obj->value);
+               NameValuePair<string> three("source", obj->source);
+               NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time));
+               NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence));
 
-                       NameValuePair<string> one("key", obj->key);
-                       NameValuePair<string> two("value", obj->value);
-                       NameValuePair<string> three("source", obj->source);
-                       NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time));
-                       NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence));
+               dict.push_back(one);
+               dict.push_back(two);
+               dict.push_back(three);
+               dict.push_back(four);
+               dict.push_back(five);
 
-                       dict.push_back(one);
-                       dict.push_back(two);
-                       dict.push_back(three);
-                       dict.push_back(four);
-                       dict.push_back(five);
+               shared->db->insert(dict);
+               delete obj;
+       }
 
-                       shared->db->insert(dict);
-                       delete obj;
-               }
+       return NULL;
+}
+
+int getNextEvent(gpointer data)
+{
+       PlaybackShared* pbshared = static_cast<PlaybackShared*>(data);
 
-               void* ret = NULL;
-               return ret;
-       };
+       if(!pbshared)
+               throw std::runtime_error("failed to cast PlaybackShared object");
 
-       thread = g_thread_new("dbthread", cb, shared);
+       auto itr = pbshared->playbackQueue.begin();
+
+       if(itr == pbshared->playbackQueue.end())
+       {
+               return 0;
+       }
+
+       DBObject* obj = *itr;
+
+       AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj->key,obj->value);
+
+       if(value)
+       {
+               pbshared->routingEngine->updateProperty(obj->key, value, pbshared->uuid);
+       }
+
+       if(++itr != pbshared->playbackQueue.end())
+       {
+               DBObject *o2 = *itr;
+               double t = o2->time - obj->time;
+
+               if(t > 0)
+                       g_timeout_add(t*1000, getNextEvent, pbshared);
+               else
+                       g_timeout_add(t, getNextEvent, pbshared);
+       }
+
+       pbshared->playbackQueue.remove(obj);
+       delete obj;
+
+       return 0;
+}
+
+DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
+       :AbstractSource(engine,config),thread(NULL),shared(NULL),playback(false),playbackShared(NULL)
+{
+       databaseName = "storage";
+       tablename = "data";
+       tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
+
+       //startDb();
+
+       if(config.find("startOnLoad")!= config.end())
+       {
+               startDb();
+       }
 
        parseConfig();
 
@@ -62,18 +110,27 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::
                engine->subscribeToProperty(*itr,this);
        }
 
+       mSupported.push_back(DatabaseFileProperty);
+       mSupported.push_back(DatabaseLoggingProperty);
+       mSupported.push_back(DatabasePlaybackProperty);
+
+       routingEngine->setSupported(mSupported,this);
+
 }
 
 DatabaseSink::~DatabaseSink()
 {
-       DBObject* obj = new DBObject();
-       obj->quit = true;
-
-       shared->queue.append(obj);
+       if(shared)
+       {
+               DBObject* obj = new DBObject();
+               obj->quit = true;
 
-       g_thread_join(thread);
+               shared->queue.append(obj);
 
-       delete shared;
+               g_thread_join(thread);
+               g_thread_unref(thread);
+               delete shared;
+       }
 }
 
 
@@ -84,13 +141,7 @@ void DatabaseSink::supportedChanged(PropertyList supportedProperties)
 
 PropertyList DatabaseSink::supported()
 {
-       PropertyList props;
-
-       props.push_back(VehicleProperty::EngineSpeed);
-       props.push_back(VehicleProperty::VehicleSpeed);
-       props.push_back(DatabaseLoggingProperty);
-
-       return props;
+       return mSupported;
 }
 
 void DatabaseSink::parseConfig()
@@ -133,8 +184,105 @@ void DatabaseSink::parseConfig()
        g_object_unref(parser);
 }
 
+void DatabaseSink::stopDb()
+{
+       if(!shared)
+               return;
+
+       DBObject *obj = new DBObject();
+       obj->quit = true;
+       shared->queue.append(obj);
+
+       g_thread_join(thread);
+       g_thread_unref(thread);
+
+       delete shared;
+       shared = NULL;
+}
+
+void DatabaseSink::startDb()
+{
+       if(playback)
+       {
+               DebugOut(0)<<"ERROR: tried to start logging during playback.  Only logging or playback can be used at one time"<<endl;
+               return;
+       }
+
+       if(shared)
+       {
+               DebugOut(0)<<"WARNING: logging already started.  doing nothing."<<endl;
+               return;
+       }
+
+       initDb();
+
+//     thread = g_thread_new("dbthread", cbFunc, shared);
+}
+
+void DatabaseSink::startPlayback()
+{
+       if(playback)
+               return;
+
+       playback = true;
+
+       initDb();
+
+       /// get supported:
+
+       vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename);
+
+       for(int i=0; i < supportedStr.size(); i++)
+       {
+               if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0]))
+                       mSupported.push_back(supportedStr[i][0]);
+       }
+
+       routingEngine->setSupported(supported(), this);
+
+       /// populate playback queue:
+
+       vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename);
+
+       if(playbackShared)
+       {
+               delete playbackShared;
+       }
+
+       playbackShared = new PlaybackShared(routingEngine,uuid());
+
+       for(int i=0;i<results.size();i++)
+       {
+               if(results[i].size() < 5)
+               {
+                       throw std::runtime_error("column mismatch in query");
+               }
+
+               DBObject* obj = new DBObject();
+
+               obj->key = results[i][0];
+               obj->value = results[i][1];
+               obj->source = results[i][2];
+               obj->time = boost::lexical_cast<double>(results[i][3]);
+               obj->sequence = boost::lexical_cast<uint16_t>(results[i][4]);
+
+               playbackShared->playbackQueue.push_back(obj);
+       }
+}
+
+void DatabaseSink::initDb()
+{
+       if(shared) delete shared;
+
+       shared = new Shared;
+       shared->db->init(databaseName, tablename, tablecreate);
+}
+
 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
 {
+       if(!shared)
+               return;
+
        DBObject* obj = new DBObject;
        obj->key = property;
        obj->value = value->toString();
@@ -153,7 +301,40 @@ std::string DatabaseSink::uuid()
 
 void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
 {
+       reply->success = false;
+
+       if(reply->property == DatabaseFileProperty)
+       {
+               StringPropertyType temp(databaseName);
+               reply->value = &temp;
+
+               reply->success = true;
+               reply->completed(reply);
+
+               return;
+       }
+       else if(reply->property == DatabaseLoggingProperty)
+       {
+               BasicPropertyType<bool> temp = shared;
 
+               reply->value = &temp;
+               reply->success = true;
+               reply->completed(reply);
+
+               return;
+       }
+
+       else if(reply->property == DatabasePlaybackProperty)
+       {
+               BasicPropertyType<bool> temp = playback;
+               reply->value = &temp;
+               reply->success = true;
+               reply->completed(reply);
+
+               return;
+       }
+
+       reply->completed(reply);
 }
 
 void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
@@ -225,7 +406,40 @@ AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
                if(request.value->value<bool>())
                {
                        ///TODO: start or stop logging thread
+                       startDb();
+                       reply->success = true;
                }
+               else
+               {
+                       stopDb();
+                       reply->success = true;
+               }
+       }
+
+       else if(request.property == DatabaseFileProperty)
+       {
+               std::string fname = request.value->toString();
+
+               databaseName = fname;
+
+               StringPropertyType temp(databaseName);
+
+               routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid());
+
+               reply->success = true;
+       }
+       else if( request.property == DatabasePlaybackProperty)
+       {
+               if(request.value->value<bool>())
+               {
+                       startPlayback();
+               }
+               else
+               {
+                       /// TODO: stop playback
+               }
+
+               reply->success = true;
        }
 
        return reply;
index c41017c..7a2607f 100644 (file)
 
 #include <glib.h>
 
+#include <functional>
+
 #define DatabaseLoggingProperty "DatabaseLogging"
+#define DatabasePlaybackProperty "DatabasePlayback"
+#define DatabaseFileProperty "DatabaseFile"
 
 template <typename T>
 class Queue
@@ -34,25 +38,26 @@ class Queue
 public:
        Queue()
        {
-               mutex = g_mutex_new();
+               g_mutex_init(&mutex);
+               g_cond_init(&cond);
        }
 
        int count()
        {
-               g_mutex_lock(mutex);
+               g_mutex_lock(&mutex);
                int ret = mQueue.count();
-               g_mutex_unlock(mutex);
+               g_mutex_unlock(&mutex);
 
                return ret;
        }
 
        T pop()
        {
-               g_mutex_lock(mutex);
+               g_mutex_lock(&mutex);
 
                while(!mQueue.size())
                {
-                       g_cond_wait(&cond, mutex);
+                       g_cond_wait(&cond, &mutex);
                }
 
                auto itr = mQueue.begin();
@@ -61,24 +66,24 @@ public:
 
                mQueue.erase(itr);
 
-               g_mutex_unlock(mutex);
+               g_mutex_unlock(&mutex);
 
                return item;
        }
 
        void append(T item)
        {
-               g_mutex_lock(mutex);
+               g_mutex_lock(&mutex);
 
                g_cond_signal(&cond);
 
                mQueue.push_back(item);
 
-               g_mutex_unlock(mutex);
+               g_mutex_unlock(&mutex);
        }
 
 private:
-       GMutex mutex;
+       GMutex mutex;
        GCond cond;
        std::vector<T> mQueue;
 };
@@ -110,6 +115,28 @@ public:
        Queue<DBObject*> queue;
 };
 
+class PlaybackShared
+{
+public:
+       PlaybackShared(AbstractRoutingEngine* re, std::string u)
+               :routingEngine(re),uuid(u) {}
+       ~PlaybackShared()
+       {
+               for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++)
+               {
+                       DBObject* obj = *itr;
+
+                       delete obj;
+               }
+
+               playbackQueue.clear();
+       }
+
+       AbstractRoutingEngine* routingEngine;
+       std::list<DBObject*> playbackQueue;
+       std::string uuid;
+};
+
 class DatabaseSink : public AbstractSource
 {
 
@@ -127,11 +154,15 @@ public:
        virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
        virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
        virtual PropertyList supported();
-       int supportedOperations() { return GetRanged}
+       int supportedOperations() { return GetRanged | Get | Set;}
 
 private: //methods:
 
        void parseConfig();
+       void stopDb();
+       void startDb();
+       void startPlayback();
+       void initDb();
 
 private:
        PropertyList mSubscriptions;
@@ -141,6 +172,9 @@ private:
        std::string tablename;
        std::string tablecreate;
        std::list<VehicleProperty::Property> propertiesToSubscribeTo;
+       PropertyList mSupported;
+       bool playback;
+       PlaybackShared* playbackShared;
 };
 
 class DatabaseSinkManager: public AbstractSinkManager
@@ -151,6 +185,8 @@ public:
        {
                new DatabaseSink(routingEngine, config);
                VehicleProperty::registerProperty(DatabaseLoggingProperty, [](){return new BasicPropertyType<bool>(false);});
+               VehicleProperty::registerProperty(DatabasePlaybackProperty, [](){return new BasicPropertyType<bool>(false);});
+VehicleProperty::registerProperty(DatabaseFileProperty, [](){return new StringPropertyType("out.ogg");});
        }
 };