database plugin works 35/2535/1
authorKevron Rees <tripzero.kev@gmail.com>
Fri, 30 Nov 2012 01:01:12 +0000 (17:01 -0800)
committerKevron Rees <tripzero.kev@gmail.com>
Fri, 30 Nov 2012 01:01:12 +0000 (17:01 -0800)
17 files changed:
ambd/core.cpp
examples/databaseconfig [new file with mode: 0644]
lib/abstractroutingengine.h
lib/abstractsource.h
plugins/database/basedb.hpp
plugins/database/databasesink.cpp
plugins/database/databasesink.h
plugins/exampleplugin.cpp
plugins/exampleplugin.h
plugins/examplesink.cpp
plugins/obd2plugin/obd2source.cpp
plugins/obd2plugin/obd2source.h
plugins/websocketsink/websocketsinkmanager.cpp
plugins/websocketsourceplugin/websocketsource.cpp
plugins/websocketsourceplugin/websocketsource.h
plugins/wheel/wheelplugin.cpp
plugins/wheel/wheelplugin.h

index 0107ee9..4148c77 100644 (file)
@@ -180,7 +180,7 @@ AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
        {
                AbstractSource* src = (*itr);
                PropertyList properties = src->supported();
-               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property))
+               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::Get)
                {
                        src->getPropertyAsync(reply);
                }
@@ -197,7 +197,7 @@ AsyncRangePropertyReply *Core::getRangePropertyAsync(AsyncRangePropertyRequest r
        {
                AbstractSource* src = (*itr);
                PropertyList properties = src->supported();
-               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property))
+               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::GetRanged)
                {
                        src->getRangePropertyAsync(reply);
                }
@@ -212,11 +212,14 @@ AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request)
        {
                AbstractSource* src = (*itr);
                PropertyList properties = src->supported();
-               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property))
+               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::Set)
                {
                        return src->setProperty(request);
                }
        }
+
+       DebugOut(0)<<"Error: setProperty opration failed"<<endl;
+       return NULL;
 }
 
 void Core::subscribeToProperty(VehicleProperty::Property property, AbstractSink* self)
diff --git a/examples/databaseconfig b/examples/databaseconfig
new file mode 100644 (file)
index 0000000..0615022
--- /dev/null
@@ -0,0 +1,19 @@
+{
+       "sources" : [ 
+               {
+                       "name" : "ExampleSouce",
+                       "path" : "/usr/lib/automotive-message-broker/examplesourceplugin.so"
+               }
+       ],
+       "sinks": [
+               {
+                       "name" : "Database",
+                       "path" : "/usr/lib/automotive-message-broker/databasesinkplugin.so"
+               },
+               {
+                       "name" : "Example Sink",
+                       "path" : "/usr/lib/automotive-message-broker/examplesinkplugin.so"
+               }
+       ]
+}
+
index f45d846..645f7f1 100644 (file)
@@ -100,7 +100,7 @@ class AsyncRangePropertyRequest
 {
 public:
        AsyncRangePropertyRequest()
-               :begin(0), end(0)
+               :timeBegin(0), timeEnd(0), sequenceBegin(-1), sequenceEnd(-1)
        {
 
        }
@@ -110,14 +110,18 @@ public:
        {
                this->property = request.property;
                this->completed = request.completed;
-               this->begin = request.begin;
-               this->end = request.end;
+               this->timeBegin = request.timeBegin;
+               this->timeEnd = request.timeEnd;
+               this->sequenceBegin = request.sequenceBegin;
+               this->sequenceEnd = request.sequenceEnd;
        }
 
        VehicleProperty::Property property;
        GetRangedPropertyCompletedSignal completed;
-       double begin;
-       double end;
+       double timeBegin;
+       double timeEnd;
+       int32_t sequenceBegin;
+       int32_t sequenceEnd;
 };
 
 class AsyncRangePropertyReply: public AsyncRangePropertyRequest
index d4c1e52..10ff01b 100644 (file)
@@ -35,10 +35,18 @@ class AbstractSource;
 
 typedef list<AbstractSource*> SourceList;
 
+
+
 class AbstractSource: public AbstractSink
 {
 
 public:
+       enum Operations {
+               Get = 0x01,
+               Set = 0x02,
+               GetRanged = 0x04
+       };
+
        AbstractSource(AbstractRoutingEngine* engine, map<string, string> config);
        virtual ~AbstractSource();
        
@@ -50,6 +58,8 @@ public:
        virtual void subscribeToPropertyChanges(VehicleProperty::Property property) = 0;
        virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property) = 0;
        virtual PropertyList supported() = 0;
+
+       virtual int supportedOperations() = 0;
        
 
 protected:
index a49872c..042b413 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "sqlitedatabase.h"
 #include "sqlitequery.h"
+#include "debugout.h"
 #include <string>
 #include <sstream>
 #include <stdio.h>
@@ -57,7 +58,7 @@ public:
        
        virtual ~BaseDB()
        {
-               printf("BaseDB: Destroying db object. Table: %s",table.c_str());
+               DebugOut()<<"BaseDB: Destroying db object. Table: "<<table<<endl;
                delete q;
                delete db;
        }
@@ -74,20 +75,21 @@ public:
        virtual void
        init(string dbname, string tablename, string tablestring)
        {
-               printf("BaseDB: Initializing db object. Table: %s",tablename.c_str());
+               DebugOut()<<"BaseDB: Initializing db object. Table: "<<tablename.c_str()<<endl;
                tableString = tablestring;
                
                db = new sqlitedatabase();
                
                db->init(dbname);
                
-               printf("BaseDB: Using db/db-file: %s",dbname.c_str());
+               DebugOut()<<"BaseDB: Using db/db-file: "<<dbname.c_str()<<endl;
                
                if(! db->Connected())
                {
-                       printf("BaseDB: database not found %s",dbname.c_str());
+                       DebugOut(0)<<"BaseDB: database not found "<<dbname<<endl;
+                       throw -1;
                }
-               q = new sqlitequery();;
+               q = new sqlitequery();
                
                q->init(db);
 
@@ -97,7 +99,7 @@ public:
        virtual void
        reloadTable()
        {
-               printf("BaseDB: reloading table %s",table.c_str());
+               DebugOut()<<"BaseDB: reloading table "<<table<<endl;
                dropTable();
                createTable();
        }
@@ -106,14 +108,14 @@ public:
        {
                bool exists=false;
                string query = "SELECT * FROM "+table+" LIMIT 0,1";
-               printf("BaseDB: checking for existing table with %s",query.c_str());
+               DebugOut()<<"BaseDB: checking for existing table with "<<query.c_str()<<endl;
                q->getResult(query);
                int numrows = q->numRows();
                if(numrows <= 0 )
                        exists = false;
                else exists = true;
 
-               printf("BaseDB: Table '%s' exists? %d because %d rows where found.", table.c_str(), exists, numrows);
+               DebugOut()<<"BaseDB: Table '"<<table<<"' exists? "<<exists<<" because "<<numrows<<" rows where found."<<endl;
                q->freeResult();
                return exists;
        }
@@ -146,7 +148,7 @@ public:
                }
                endquery<<" )";
                query+=" )"+endquery.str();
-               printf("BaseDB: %s",query.c_str());
+               DebugOut()<<"BaseDB: "<<query<<endl;
                q->execute(query);
        }
        
@@ -163,7 +165,7 @@ public:
                endquery<<"'"<<fixInvalids(tempval.str())<<"'";
                endquery<<" )";
                query+=" )"+endquery.str();
-               printf("BaseDB: %s",query.c_str());
+               DebugOut()<<"BaseDB: "<<query<<endl;
                q->execute(query);
        }
        
@@ -263,6 +265,40 @@ public:
                return filename;
        }
        
+       vector<vector<string> > select(string query)
+       {
+               DebugOut()<<query<<endl;
+
+               vector<vector<string>> dataMap;
+
+               q->getResult(query);
+
+               if(q->numRows() <= 0)
+               {
+                       q->freeResult();
+                       return dataMap;
+               }
+
+               int i=0;
+
+               while(q->fetchRow())
+               {
+                       string v;
+                       dataMap.push_back(vector<string>());
+
+                       while((v = q->getStr()) != "")
+                       {
+                               dataMap[i].push_back(v);
+                       }
+                       i++;
+               }
+
+               q->freeResult();
+
+               return dataMap;
+
+       }
+
 protected:
        
        void
index abb7bef..742f5ac 100644 (file)
@@ -1,10 +1,28 @@
 #include "databasesink.h"
+#include "abstractroutingengine.h"
+
+extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+       return new DatabaseSinkManager(routingengine, config);
+}
 
 DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
-       :AbstractSink(engine,config)
+       :AbstractSource(engine,config)
 {
+       databaseName = "storage";
+       tablename = "data";
+       tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
        shared = new Shared;
-       shared->db->init("storage","data","CREATE TABLE IF NOT EXISTS vehicledata (key TEXT, value BLOB, time REAL, source TEXT");
+       shared->db->init(databaseName, tablename, tablecreate);
+
+       engine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
+       engine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
+
+       PropertyList props;
+       props.push_back(VehicleProperty::EngineSpeed);
+       props.push_back(VehicleProperty::VehicleSpeed);
+
+       engine->setSupported(supported(),this);
 
        auto cb = [](gpointer data)
        {
@@ -47,13 +65,14 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::
 
 DatabaseSink::~DatabaseSink()
 {
+       DBObject* obj = new DBObject();
+       obj->quit = true;
 
-}
-
+       shared->queue.append(obj);
 
-PropertyList DatabaseSink::subscriptions()
-{
+       g_thread_join(thread);
 
+       delete shared;
 }
 
 
@@ -62,6 +81,15 @@ void DatabaseSink::supportedChanged(PropertyList supportedProperties)
 
 }
 
+PropertyList DatabaseSink::supported()
+{
+       PropertyList props;
+
+       props.push_back(VehicleProperty::EngineSpeed);
+       props.push_back(VehicleProperty::VehicleSpeed);
+
+       return props;
+}
 
 void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
 {
@@ -71,6 +99,8 @@ void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractP
        obj->source = uuid;
        obj->time = value->timestamp;
        obj->sequence = value->sequence;
+
+       shared->queue.append(obj);
 }
 
 
@@ -78,3 +108,55 @@ std::string DatabaseSink::uuid()
 {
        return "9f88156e-cb92-4472-8775-9c08addf50d3";
 }
+
+void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
+{
+
+}
+
+void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
+{
+       BaseDB * db = new BaseDB();
+       db->init(databaseName, tablename, tablecreate);
+
+       ostringstream query;
+       query.precision(15);
+
+       query<<"SELECT * from "<<tablename<<" WHERE ";
+
+       if(reply->timeBegin && reply->timeEnd)
+       {
+               query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd;
+       }
+
+       if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0)
+       {
+               query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd;
+       }
+
+       std::vector<std::vector<string>> data = db->select(query.str());
+
+       for(auto i=0;i<data.size();i++)
+       {
+               for(auto n=0;n<data[i].size();n++)
+                       cout<<"Data: "<<data[i][n]<<endl;
+       }
+
+       delete db;
+}
+
+AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
+{
+       AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+       reply->success = false;
+       return reply;
+}
+
+void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property )
+{
+
+}
+
+void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property )
+{
+}
index 5884c81..2cdf208 100644 (file)
@@ -21,6 +21,7 @@
 #define DATABASESINK_H
 
 #include "abstractsink.h"
+#include "abstractsource.h"
 #include "basedb.hpp"
 
 #include <glib.h>
@@ -69,7 +70,7 @@ public:
 
                g_cond_signal(&cond);
 
-               mQueue.append(item);
+               mQueue.push_back(item);
 
                g_mutex_unlock(mutex);
        }
@@ -82,7 +83,7 @@ private:
 
 class DBObject {
 public:
-       DBObject(): time(0),quit(false) {}
+       DBObject(): time(0), sequence(0), quit(false) {}
        std::string key;
        std::string value;
        std::string source;
@@ -103,22 +104,32 @@ public:
        Queue<DBObject*> queue;
 };
 
-class DatabaseSink : public AbstractSink
+class DatabaseSink : public AbstractSource
 {
 
 public:
        DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config);
        ~DatabaseSink();
-       virtual PropertyList subscriptions();
        virtual void supportedChanged(PropertyList supportedProperties);
        virtual void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid);
        virtual std::string uuid();
 
+       ///source role:
+       virtual void getPropertyAsync(AsyncPropertyReply *reply);
+       virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply);
+       virtual AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request);
+       virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
+       virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
+       virtual PropertyList supported();
+       int supportedOperations() { return GetRanged; }
+
 private:
        PropertyList mSubscriptions;
        Shared *shared;
        GThread* thread;
-
+       std::string databaseName;
+       std::string tablename;
+       std::string tablecreate;
 };
 
 class DatabaseSinkManager: public AbstractSinkManager
index db7b95a..e73a05e 100644 (file)
@@ -155,6 +155,11 @@ PropertyList ExampleSourcePlugin::supported()
        return props;
 }
 
+int ExampleSourcePlugin::supportedOperations()
+{
+       return Get | Set | GetRanged;
+}
+
 void ExampleSourcePlugin::unsubscribeToPropertyChanges(VehicleProperty::Property property)
 {
        mRequests.remove(property);
index 8f4327b..715f054 100644 (file)
@@ -37,6 +37,8 @@ public:
        void subscribeToPropertyChanges(VehicleProperty::Property property);
        void unsubscribeToPropertyChanges(VehicleProperty::Property property);
        PropertyList supported();
+
+       int supportedOperations();
        
        void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {}
        void supportedChanged(PropertyList) {}
index dd531f5..e0e7bf7 100644 (file)
@@ -58,14 +58,10 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine, map<string, string> conf
 
        routingEngine->getPropertyAsync(batteryVoltageRequest);
 
-       boost::posix_time::ptime start(boost::gregorian::date(2012,10,20));
-       boost::posix_time::ptime end(boost::gregorian::date(2012,10,21));
-
        AsyncRangePropertyRequest vehicleSpeedFromLastWeek;
-       tm tmStart = boost::posix_time::to_tm(start);
-       tm tmEnd = boost::posix_time::to_tm(end);
-       vehicleSpeedFromLastWeek.begin = mktime(&tmStart);
-       vehicleSpeedFromLastWeek.end = mktime(&tmEnd);
+
+       vehicleSpeedFromLastWeek.timeBegin = 1354233906.54099;
+       vehicleSpeedFromLastWeek.timeEnd = 1354234153.03318;
        vehicleSpeedFromLastWeek.property = VehicleProperty::VehicleSpeed;
        vehicleSpeedFromLastWeek.completed = [](AsyncRangePropertyReply* reply)
        {
index 4170145..6161efd 100644 (file)
@@ -609,6 +609,12 @@ PropertyList OBD2Source::supported()
 {
        return m_supportedProperties;
 }
+
+int OBD2Source::supportedOperations()
+{
+       return Get | Set;
+}
+
 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
 {
        return new OBD2Source(routingengine, config);
index b9499db..ee1bf57 100644 (file)
@@ -134,6 +134,9 @@ public:
        void subscribeToPropertyChanges(VehicleProperty::Property property);
        void unsubscribeToPropertyChanges(VehicleProperty::Property property);
        PropertyList supported();
+
+       int supportedOperations();
+
        PropertyList queuedRequests;
        bool clientConnected;
        PropertyList activeRequests;
index 05a5570..da018a3 100644 (file)
@@ -151,8 +151,8 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Vehicle
 {
        AsyncRangePropertyRequest rangedRequest;
 
-       rangedRequest.begin = start;
-       rangedRequest.end = end;
+       rangedRequest.timeBegin = start;
+       rangedRequest.timeEnd = end;
 
        if (property == "running_status_speedometer")
        {
index 3cfa094..0706ee6 100644 (file)
@@ -410,6 +410,12 @@ PropertyList WebSocketSource::supported()
        return m_supportedProperties;
 }
 
+int WebSocketSource::supportedOperations()
+{
+       /// TODO: need to do this correctly based on what the host supports.
+       return Get | Set;
+}
+
 string WebSocketSource::uuid()
 {
        return "d293f670-f0b3-11e1-aff1-0800200c9a66";
index 3eaaaa5..1295019 100644 (file)
@@ -40,6 +40,9 @@ public:
        void subscribeToPropertyChanges(VehicleProperty::Property property);
        void unsubscribeToPropertyChanges(VehicleProperty::Property property);
        PropertyList supported();
+
+       int supportedOperations();
+
        libwebsocket *clientsocket;
        PropertyList queuedRequests;
        bool clientConnected;
index b8f0e50..d8d87c1 100644 (file)
@@ -167,6 +167,11 @@ PropertyList WheelSourcePlugin::supported()
        return props;
 }
 
+int WheelSourcePlugin::supportedOperations()
+{
+       return Get | Set;
+}
+
 void WheelSourcePlugin::unsubscribeToPropertyChanges(VehicleProperty::Property property)
 {
        mRequests.erase(property);
index ab7e662..30e7b87 100644 (file)
@@ -40,6 +40,8 @@ public:
        void subscribeToPropertyChanges(VehicleProperty::Property property);
        void unsubscribeToPropertyChanges(VehicleProperty::Property property);
        PropertyList supported();
+
+       int supportedOperations();
        
        void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {}
        void supportedChanged(PropertyList) {}