{
AbstractSource* src = (*itr);
PropertyList properties = src->supported();
- if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property))
+ if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::Get)
{
src->getPropertyAsync(reply);
}
{
AbstractSource* src = (*itr);
PropertyList properties = src->supported();
- if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property))
+ if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::GetRanged)
{
src->getRangePropertyAsync(reply);
}
{
AbstractSource* src = (*itr);
PropertyList properties = src->supported();
- if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property))
+ if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::Set)
{
return src->setProperty(request);
}
}
+
+ DebugOut(0)<<"Error: setProperty opration failed"<<endl;
+ return NULL;
}
void Core::subscribeToProperty(VehicleProperty::Property property, AbstractSink* self)
--- /dev/null
+{
+ "sources" : [
+ {
+ "name" : "ExampleSouce",
+ "path" : "/usr/lib/automotive-message-broker/examplesourceplugin.so"
+ }
+ ],
+ "sinks": [
+ {
+ "name" : "Database",
+ "path" : "/usr/lib/automotive-message-broker/databasesinkplugin.so"
+ },
+ {
+ "name" : "Example Sink",
+ "path" : "/usr/lib/automotive-message-broker/examplesinkplugin.so"
+ }
+ ]
+}
+
{
public:
AsyncRangePropertyRequest()
- :begin(0), end(0)
+ :timeBegin(0), timeEnd(0), sequenceBegin(-1), sequenceEnd(-1)
{
}
{
this->property = request.property;
this->completed = request.completed;
- this->begin = request.begin;
- this->end = request.end;
+ this->timeBegin = request.timeBegin;
+ this->timeEnd = request.timeEnd;
+ this->sequenceBegin = request.sequenceBegin;
+ this->sequenceEnd = request.sequenceEnd;
}
VehicleProperty::Property property;
GetRangedPropertyCompletedSignal completed;
- double begin;
- double end;
+ double timeBegin;
+ double timeEnd;
+ int32_t sequenceBegin;
+ int32_t sequenceEnd;
};
class AsyncRangePropertyReply: public AsyncRangePropertyRequest
typedef list<AbstractSource*> SourceList;
+
+
class AbstractSource: public AbstractSink
{
public:
+ enum Operations {
+ Get = 0x01,
+ Set = 0x02,
+ GetRanged = 0x04
+ };
+
AbstractSource(AbstractRoutingEngine* engine, map<string, string> config);
virtual ~AbstractSource();
virtual void subscribeToPropertyChanges(VehicleProperty::Property property) = 0;
virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property) = 0;
virtual PropertyList supported() = 0;
+
+ virtual int supportedOperations() = 0;
protected:
#include "sqlitedatabase.h"
#include "sqlitequery.h"
+#include "debugout.h"
#include <string>
#include <sstream>
#include <stdio.h>
virtual ~BaseDB()
{
- printf("BaseDB: Destroying db object. Table: %s",table.c_str());
+ DebugOut()<<"BaseDB: Destroying db object. Table: "<<table<<endl;
delete q;
delete db;
}
virtual void
init(string dbname, string tablename, string tablestring)
{
- printf("BaseDB: Initializing db object. Table: %s",tablename.c_str());
+ DebugOut()<<"BaseDB: Initializing db object. Table: "<<tablename.c_str()<<endl;
tableString = tablestring;
db = new sqlitedatabase();
db->init(dbname);
- printf("BaseDB: Using db/db-file: %s",dbname.c_str());
+ DebugOut()<<"BaseDB: Using db/db-file: "<<dbname.c_str()<<endl;
if(! db->Connected())
{
- printf("BaseDB: database not found %s",dbname.c_str());
+ DebugOut(0)<<"BaseDB: database not found "<<dbname<<endl;
+ throw -1;
}
- q = new sqlitequery();;
+ q = new sqlitequery();
q->init(db);
virtual void
reloadTable()
{
- printf("BaseDB: reloading table %s",table.c_str());
+ DebugOut()<<"BaseDB: reloading table "<<table<<endl;
dropTable();
createTable();
}
{
bool exists=false;
string query = "SELECT * FROM "+table+" LIMIT 0,1";
- printf("BaseDB: checking for existing table with %s",query.c_str());
+ DebugOut()<<"BaseDB: checking for existing table with "<<query.c_str()<<endl;
q->getResult(query);
int numrows = q->numRows();
if(numrows <= 0 )
exists = false;
else exists = true;
- printf("BaseDB: Table '%s' exists? %d because %d rows where found.", table.c_str(), exists, numrows);
+ DebugOut()<<"BaseDB: Table '"<<table<<"' exists? "<<exists<<" because "<<numrows<<" rows where found."<<endl;
q->freeResult();
return exists;
}
}
endquery<<" )";
query+=" )"+endquery.str();
- printf("BaseDB: %s",query.c_str());
+ DebugOut()<<"BaseDB: "<<query<<endl;
q->execute(query);
}
endquery<<"'"<<fixInvalids(tempval.str())<<"'";
endquery<<" )";
query+=" )"+endquery.str();
- printf("BaseDB: %s",query.c_str());
+ DebugOut()<<"BaseDB: "<<query<<endl;
q->execute(query);
}
return filename;
}
+ vector<vector<string> > select(string query)
+ {
+ DebugOut()<<query<<endl;
+
+ vector<vector<string>> dataMap;
+
+ q->getResult(query);
+
+ if(q->numRows() <= 0)
+ {
+ q->freeResult();
+ return dataMap;
+ }
+
+ int i=0;
+
+ while(q->fetchRow())
+ {
+ string v;
+ dataMap.push_back(vector<string>());
+
+ while((v = q->getStr()) != "")
+ {
+ dataMap[i].push_back(v);
+ }
+ i++;
+ }
+
+ q->freeResult();
+
+ return dataMap;
+
+ }
+
protected:
void
#include "databasesink.h"
+#include "abstractroutingengine.h"
+
+extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+ return new DatabaseSinkManager(routingengine, config);
+}
DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config)
- :AbstractSink(engine,config)
+ :AbstractSource(engine,config)
{
+ databaseName = "storage";
+ tablename = "data";
+ tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)";
shared = new Shared;
- shared->db->init("storage","data","CREATE TABLE IF NOT EXISTS vehicledata (key TEXT, value BLOB, time REAL, source TEXT");
+ shared->db->init(databaseName, tablename, tablecreate);
+
+ engine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
+ engine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
+
+ PropertyList props;
+ props.push_back(VehicleProperty::EngineSpeed);
+ props.push_back(VehicleProperty::VehicleSpeed);
+
+ engine->setSupported(supported(),this);
auto cb = [](gpointer data)
{
DatabaseSink::~DatabaseSink()
{
+ DBObject* obj = new DBObject();
+ obj->quit = true;
-}
-
+ shared->queue.append(obj);
-PropertyList DatabaseSink::subscriptions()
-{
+ g_thread_join(thread);
+ delete shared;
}
}
+PropertyList DatabaseSink::supported()
+{
+ PropertyList props;
+
+ props.push_back(VehicleProperty::EngineSpeed);
+ props.push_back(VehicleProperty::VehicleSpeed);
+
+ return props;
+}
void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid)
{
obj->source = uuid;
obj->time = value->timestamp;
obj->sequence = value->sequence;
+
+ shared->queue.append(obj);
}
{
return "9f88156e-cb92-4472-8775-9c08addf50d3";
}
+
+void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply)
+{
+
+}
+
+void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply)
+{
+ BaseDB * db = new BaseDB();
+ db->init(databaseName, tablename, tablecreate);
+
+ ostringstream query;
+ query.precision(15);
+
+ query<<"SELECT * from "<<tablename<<" WHERE ";
+
+ if(reply->timeBegin && reply->timeEnd)
+ {
+ query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd;
+ }
+
+ if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0)
+ {
+ query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd;
+ }
+
+ std::vector<std::vector<string>> data = db->select(query.str());
+
+ for(auto i=0;i<data.size();i++)
+ {
+ for(auto n=0;n<data[i].size();n++)
+ cout<<"Data: "<<data[i][n]<<endl;
+ }
+
+ delete db;
+}
+
+AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request)
+{
+ AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+ reply->success = false;
+ return reply;
+}
+
+void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property )
+{
+
+}
+
+void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property )
+{
+}
#define DATABASESINK_H
#include "abstractsink.h"
+#include "abstractsource.h"
#include "basedb.hpp"
#include <glib.h>
g_cond_signal(&cond);
- mQueue.append(item);
+ mQueue.push_back(item);
g_mutex_unlock(mutex);
}
class DBObject {
public:
- DBObject(): time(0),quit(false) {}
+ DBObject(): time(0), sequence(0), quit(false) {}
std::string key;
std::string value;
std::string source;
Queue<DBObject*> queue;
};
-class DatabaseSink : public AbstractSink
+class DatabaseSink : public AbstractSource
{
public:
DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config);
~DatabaseSink();
- virtual PropertyList subscriptions();
virtual void supportedChanged(PropertyList supportedProperties);
virtual void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid);
virtual std::string uuid();
+ ///source role:
+ virtual void getPropertyAsync(AsyncPropertyReply *reply);
+ virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply);
+ virtual AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request);
+ virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
+ virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
+ virtual PropertyList supported();
+ int supportedOperations() { return GetRanged; }
+
private:
PropertyList mSubscriptions;
Shared *shared;
GThread* thread;
-
+ std::string databaseName;
+ std::string tablename;
+ std::string tablecreate;
};
class DatabaseSinkManager: public AbstractSinkManager
return props;
}
+int ExampleSourcePlugin::supportedOperations()
+{
+ return Get | Set | GetRanged;
+}
+
void ExampleSourcePlugin::unsubscribeToPropertyChanges(VehicleProperty::Property property)
{
mRequests.remove(property);
void subscribeToPropertyChanges(VehicleProperty::Property property);
void unsubscribeToPropertyChanges(VehicleProperty::Property property);
PropertyList supported();
+
+ int supportedOperations();
void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {}
void supportedChanged(PropertyList) {}
routingEngine->getPropertyAsync(batteryVoltageRequest);
- boost::posix_time::ptime start(boost::gregorian::date(2012,10,20));
- boost::posix_time::ptime end(boost::gregorian::date(2012,10,21));
-
AsyncRangePropertyRequest vehicleSpeedFromLastWeek;
- tm tmStart = boost::posix_time::to_tm(start);
- tm tmEnd = boost::posix_time::to_tm(end);
- vehicleSpeedFromLastWeek.begin = mktime(&tmStart);
- vehicleSpeedFromLastWeek.end = mktime(&tmEnd);
+
+ vehicleSpeedFromLastWeek.timeBegin = 1354233906.54099;
+ vehicleSpeedFromLastWeek.timeEnd = 1354234153.03318;
vehicleSpeedFromLastWeek.property = VehicleProperty::VehicleSpeed;
vehicleSpeedFromLastWeek.completed = [](AsyncRangePropertyReply* reply)
{
{
return m_supportedProperties;
}
+
+int OBD2Source::supportedOperations()
+{
+ return Get | Set;
+}
+
extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
{
return new OBD2Source(routingengine, config);
void subscribeToPropertyChanges(VehicleProperty::Property property);
void unsubscribeToPropertyChanges(VehicleProperty::Property property);
PropertyList supported();
+
+ int supportedOperations();
+
PropertyList queuedRequests;
bool clientConnected;
PropertyList activeRequests;
{
AsyncRangePropertyRequest rangedRequest;
- rangedRequest.begin = start;
- rangedRequest.end = end;
+ rangedRequest.timeBegin = start;
+ rangedRequest.timeEnd = end;
if (property == "running_status_speedometer")
{
return m_supportedProperties;
}
+int WebSocketSource::supportedOperations()
+{
+ /// TODO: need to do this correctly based on what the host supports.
+ return Get | Set;
+}
+
string WebSocketSource::uuid()
{
return "d293f670-f0b3-11e1-aff1-0800200c9a66";
void subscribeToPropertyChanges(VehicleProperty::Property property);
void unsubscribeToPropertyChanges(VehicleProperty::Property property);
PropertyList supported();
+
+ int supportedOperations();
+
libwebsocket *clientsocket;
PropertyList queuedRequests;
bool clientConnected;
return props;
}
+int WheelSourcePlugin::supportedOperations()
+{
+ return Get | Set;
+}
+
void WheelSourcePlugin::unsubscribeToPropertyChanges(VehicleProperty::Property property)
{
mRequests.erase(property);
void subscribeToPropertyChanges(VehicleProperty::Property property);
void unsubscribeToPropertyChanges(VehicleProperty::Property property);
PropertyList supported();
+
+ int supportedOperations();
void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {}
void supportedChanged(PropertyList) {}