From f0fe6565195d6c984166a283f314561656a40473 Mon Sep 17 00:00:00 2001 From: Kevron Rees Date: Thu, 13 Feb 2014 11:37:30 -0800 Subject: [PATCH] added frequency option to database plugin. fixed up zone a bit. --- examples/databaseconfig | 3 +- plugins/database/README | 9 +++- plugins/database/databasesink.cpp | 89 ++++++++++++++++++++++----------------- plugins/database/databasesink.h | 85 ++++++++++++++++++++++++++++++++++--- 4 files changed, 140 insertions(+), 46 deletions(-) diff --git a/examples/databaseconfig b/examples/databaseconfig index 0405063..45d7c60 100644 --- a/examples/databaseconfig +++ b/examples/databaseconfig @@ -12,7 +12,8 @@ "properties" : "{ 'properties' : ['VehicleSpeed','EngineSpeed'] }", "databaseFile" : "/tmp/storage", "startOnLoad" : "true", - "bufferLength" : "1" + "bufferLength" : "1", + "frequency" : "1" }, { "name" : "Example Sink", diff --git a/plugins/database/README b/plugins/database/README index d83cbd6..f380f0f 100644 --- a/plugins/database/README +++ b/plugins/database/README @@ -17,7 +17,8 @@ To use the Database plugin, add the following to the "sources" array in /etc/amb "properties" : "{ 'properties' : ['VehicleSpeed','EngineSpeed'] }", "startOnLoad" : "false", "playbackOnLoad" : "false", - "playbackMultiplier" : "1" + "playbackMultiplier" : "1", + "frequency" : "1" } Configuration Key Definitions: @@ -60,6 +61,12 @@ etc. Default: 1 +"frequency" +Frequency in Hz in which the database will write contents. Only the newest values +will be written. Other values are discarded. If bufferLength is not full,the +database will not be written to until it is. + +Default: 1 AMB Properties: diff --git a/plugins/database/databasesink.cpp b/plugins/database/databasesink.cpp index f4790be..bf388eb 100644 --- a/plugins/database/databasesink.cpp +++ b/plugins/database/databasesink.cpp @@ -3,6 +3,7 @@ #include "listplusplus.h" int bufferLength = 100; +int timeout=1000; extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map config) { @@ -22,22 +23,23 @@ void * cbFunc(gpointer data) while(1) { - DBObject* obj = shared->queue.pop(); + usleep(timeout*1000); - if( obj->quit ) + DBObject obj = shared->queue.pop(); + + if( obj.quit ) { - delete obj; break; } DictionaryList dict; - NameValuePair one("key", obj->key); - NameValuePair two("value", obj->value); - NameValuePair three("source", obj->source); - NameValuePair zone("zone", boost::lexical_cast(obj->zone)); - NameValuePair four("time", boost::lexical_cast(obj->time)); - NameValuePair five("sequence", boost::lexical_cast(obj->sequence)); + NameValuePair one("key", obj.key); + NameValuePair two("value", obj.value); + NameValuePair three("source", obj.source); + NameValuePair zone("zone", boost::lexical_cast(obj.zone)); + NameValuePair four("time", boost::lexical_cast(obj.time)); + NameValuePair five("sequence", boost::lexical_cast(obj.sequence)); dict.push_back(one); dict.push_back(two); @@ -59,7 +61,7 @@ void * cbFunc(gpointer data) shared->db->exec("END TRANSACTION"); insertList.clear(); } - delete obj; + //delete obj; } /// final flush of whatever is still in the queue: @@ -92,22 +94,23 @@ int getNextEvent(gpointer data) return 0; } - DBObject* obj = *itr; + DBObject obj = *itr; - AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj->key,obj->value); + AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj.key,obj.value); if(value) { pbshared->routingEngine->updateProperty(value, pbshared->uuid); - value->timestamp = obj->time; - value->sequence = obj->sequence; - value->sourceUuid = obj->source; + value->timestamp = obj.time; + value->sequence = obj.sequence; + value->sourceUuid = obj.source; + value->zone = obj.zone; } if(++itr != pbshared->playbackQueue.end()) { - DBObject *o2 = *itr; - double t = o2->time - obj->time; + DBObject o2 = *itr; + double t = o2.time - obj.time; if(t > 0) g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared); @@ -117,7 +120,7 @@ int getNextEvent(gpointer data) pbshared->playbackQueue.remove(obj); DebugOut()<<"playback Queue size: "<playbackQueue.size()<(config["frequency"]); + timeout = 1000 / t; + }catch(...) + { + DebugOut(DebugOut::Error)<<"Failed to parse frequency: Invalid value "<quit = true; - - shared->queue.append(obj); - - g_thread_join(thread); -// g_thread_unref(thread); - delete shared; + stopDb(); } if(playbackShared) @@ -252,8 +262,8 @@ void DatabaseSink::stopDb() if(!shared) return; - DBObject *obj = new DBObject(); - obj->quit = true; + DBObject obj; + obj.quit = true; shared->queue.append(obj); g_thread_join(thread); @@ -312,12 +322,12 @@ void DatabaseSink::startPlayback() throw std::runtime_error("column mismatch in query"); } - DBObject* obj = new DBObject(); + DBObject obj; - obj->key = results[i][0]; - obj->value = results[i][1]; - obj->source = results[i][2]; - obj->time = boost::lexical_cast(results[i][3]); + obj.key = results[i][0]; + obj.value = results[i][1]; + obj.source = results[i][2]; + obj.time = boost::lexical_cast(results[i][3]); /// TODO: figure out why sequence is broken: @@ -390,12 +400,13 @@ void DatabaseSink::propertyChanged(AbstractPropertyType *value) routingEngine->setSupported(mSupported, this); } - DBObject* obj = new DBObject; - obj->key = property; - obj->value = value->toString(); - obj->source = value->sourceUuid; - obj->time = value->timestamp; - obj->sequence = value->sequence; + DBObject obj; + obj.key = property; + obj.value = value->toString(); + obj.source = value->sourceUuid; + obj.time = value->timestamp; + obj.sequence = value->sequence; + obj.zone = value->zone; shared->queue.append(obj); } diff --git a/plugins/database/databasesink.h b/plugins/database/databasesink.h index 7e9006f..390fead 100644 --- a/plugins/database/databasesink.h +++ b/plugins/database/databasesink.h @@ -23,10 +23,12 @@ #include "abstractsink.h" #include "abstractsource.h" #include "basedb.hpp" +#include "listplusplus.h" #include #include +#include const std::string DatabaseLogging = "DatabaseLogging"; const std::string DatabasePlayback = "DatabasePlayback"; @@ -92,6 +94,71 @@ private: std::vector mQueue; }; +template +class UniqueQueue +{ +public: + UniqueQueue() + { + g_mutex_init(&mutex); + g_cond_init(&cond); + } + ~UniqueQueue() + { + + } + + int count() + { + g_mutex_lock(&mutex); + int ret = mQueue.count(); + g_mutex_unlock(&mutex); + + return ret; + } + + T pop() + { + g_mutex_lock(&mutex); + + while(!mQueue.size()) + { + g_cond_wait(&cond, &mutex); + } + + auto itr = mQueue.begin(); + + T item = (*itr); + + mQueue.erase(itr); + + g_mutex_unlock(&mutex); + + return item; + } + + void append(T item) + { + g_mutex_lock(&mutex); + + g_cond_signal(&cond); + + if(contains(mQueue, item)) + { + /// remove old one. We only want the freshest of values + mQueue.erase(std::find(mQueue.begin(), mQueue.end(), item)); + } + mQueue.push_back(item); + + g_mutex_unlock(&mutex); + } + +private: + GMutex mutex; + GCond cond; + std::vector mQueue; +}; + class DBObject { public: DBObject(): zone(0), time(0), sequence(0), quit(false) {} @@ -102,6 +169,16 @@ public: double time; int32_t sequence; bool quit; + + bool operator ==(const DBObject & other) + { + return (key == other.key && source == other.source && zone == other.zone); + } + + bool operator != (const DBObject & other) + { + return (*this == other) == false; + } }; class Shared @@ -117,7 +194,7 @@ public: } BaseDB * db; - Queue queue; + UniqueQueue queue; }; class PlaybackShared @@ -129,16 +206,14 @@ public: { for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++) { - DBObject* obj = *itr; - - delete obj; + DBObject obj = *itr; } playbackQueue.clear(); } AbstractRoutingEngine* routingEngine; - std::list playbackQueue; + std::list playbackQueue; uint playBackMultiplier; std::string uuid; bool stop; -- 2.7.4