#include "listplusplus.h"
int bufferLength = 100;
+int timeout=1000;
extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
{
while(1)
{
- DBObject* obj = shared->queue.pop();
+ usleep(timeout*1000);
- if( obj->quit )
+ DBObject obj = shared->queue.pop();
+
+ if( obj.quit )
{
- delete obj;
break;
}
DictionaryList<string> dict;
- NameValuePair<string> one("key", obj->key);
- NameValuePair<string> two("value", obj->value);
- NameValuePair<string> three("source", obj->source);
- NameValuePair<string> zone("zone", boost::lexical_cast<string>(obj->zone));
- NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time));
- NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence));
+ NameValuePair<string> one("key", obj.key);
+ NameValuePair<string> two("value", obj.value);
+ NameValuePair<string> three("source", obj.source);
+ NameValuePair<string> zone("zone", boost::lexical_cast<string>(obj.zone));
+ NameValuePair<string> four("time", boost::lexical_cast<string>(obj.time));
+ NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj.sequence));
dict.push_back(one);
dict.push_back(two);
shared->db->exec("END TRANSACTION");
insertList.clear();
}
- delete obj;
+ //delete obj;
}
/// final flush of whatever is still in the queue:
return 0;
}
- DBObject* obj = *itr;
+ DBObject obj = *itr;
- AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj->key,obj->value);
+ AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj.key,obj.value);
if(value)
{
pbshared->routingEngine->updateProperty(value, pbshared->uuid);
- value->timestamp = obj->time;
- value->sequence = obj->sequence;
- value->sourceUuid = obj->source;
+ value->timestamp = obj.time;
+ value->sequence = obj.sequence;
+ value->sourceUuid = obj.source;
+ value->zone = obj.zone;
}
if(++itr != pbshared->playbackQueue.end())
{
- DBObject *o2 = *itr;
- double t = o2->time - obj->time;
+ DBObject o2 = *itr;
+ double t = o2.time - obj.time;
if(t > 0)
g_timeout_add((t*1000) / pbshared->playBackMultiplier, getNextEvent, pbshared);
pbshared->playbackQueue.remove(obj);
DebugOut()<<"playback Queue size: "<<pbshared->playbackQueue.size()<<endl;
- delete obj;
+ //delete obj;
return 0;
}
bufferLength = atoi(config["bufferLength"].c_str());
}
+ if(config.find("frequency") != config.end())
+ {
+ try
+ {
+ int t = boost::lexical_cast<int>(config["frequency"]);
+ timeout = 1000 / t;
+ }catch(...)
+ {
+ DebugOut(DebugOut::Error)<<"Failed to parse frequency: Invalid value "<<config["frequency"]<<endl;
+ }
+
+
+ }
+
if(config.find("properties") != config.end())
{
parseConfig();
{
if(shared)
{
- DBObject* obj = new DBObject();
- obj->quit = true;
-
- shared->queue.append(obj);
-
- g_thread_join(thread);
-// g_thread_unref(thread);
- delete shared;
+ stopDb();
}
if(playbackShared)
if(!shared)
return;
- DBObject *obj = new DBObject();
- obj->quit = true;
+ DBObject obj;
+ obj.quit = true;
shared->queue.append(obj);
g_thread_join(thread);
throw std::runtime_error("column mismatch in query");
}
- DBObject* obj = new DBObject();
+ DBObject obj;
- obj->key = results[i][0];
- obj->value = results[i][1];
- obj->source = results[i][2];
- obj->time = boost::lexical_cast<double>(results[i][3]);
+ obj.key = results[i][0];
+ obj.value = results[i][1];
+ obj.source = results[i][2];
+ obj.time = boost::lexical_cast<double>(results[i][3]);
/// TODO: figure out why sequence is broken:
routingEngine->setSupported(mSupported, this);
}
- DBObject* obj = new DBObject;
- obj->key = property;
- obj->value = value->toString();
- obj->source = value->sourceUuid;
- obj->time = value->timestamp;
- obj->sequence = value->sequence;
+ DBObject obj;
+ obj.key = property;
+ obj.value = value->toString();
+ obj.source = value->sourceUuid;
+ obj.time = value->timestamp;
+ obj.sequence = value->sequence;
+ obj.zone = value->zone;
shared->queue.append(obj);
}
#include "abstractsink.h"
#include "abstractsource.h"
#include "basedb.hpp"
+#include "listplusplus.h"
#include <glib.h>
#include <functional>
+#include <unordered_map>
const std::string DatabaseLogging = "DatabaseLogging";
const std::string DatabasePlayback = "DatabasePlayback";
std::vector<T> mQueue;
};
+template <typename T>
+class UniqueQueue
+{
+public:
+ UniqueQueue()
+ {
+ g_mutex_init(&mutex);
+ g_cond_init(&cond);
+ }
+ ~UniqueQueue()
+ {
+
+ }
+
+ int count()
+ {
+ g_mutex_lock(&mutex);
+ int ret = mQueue.count();
+ g_mutex_unlock(&mutex);
+
+ return ret;
+ }
+
+ T pop()
+ {
+ g_mutex_lock(&mutex);
+
+ while(!mQueue.size())
+ {
+ g_cond_wait(&cond, &mutex);
+ }
+
+ auto itr = mQueue.begin();
+
+ T item = (*itr);
+
+ mQueue.erase(itr);
+
+ g_mutex_unlock(&mutex);
+
+ return item;
+ }
+
+ void append(T item)
+ {
+ g_mutex_lock(&mutex);
+
+ g_cond_signal(&cond);
+
+ if(contains(mQueue, item))
+ {
+ /// remove old one. We only want the freshest of values
+ mQueue.erase(std::find(mQueue.begin(), mQueue.end(), item));
+ }
+ mQueue.push_back(item);
+
+ g_mutex_unlock(&mutex);
+ }
+
+private:
+ GMutex mutex;
+ GCond cond;
+ std::vector<T> mQueue;
+};
+
class DBObject {
public:
DBObject(): zone(0), time(0), sequence(0), quit(false) {}
double time;
int32_t sequence;
bool quit;
+
+ bool operator ==(const DBObject & other)
+ {
+ return (key == other.key && source == other.source && zone == other.zone);
+ }
+
+ bool operator != (const DBObject & other)
+ {
+ return (*this == other) == false;
+ }
};
class Shared
}
BaseDB * db;
- Queue<DBObject*> queue;
+ UniqueQueue<DBObject> queue;
};
class PlaybackShared
{
for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++)
{
- DBObject* obj = *itr;
-
- delete obj;
+ DBObject obj = *itr;
}
playbackQueue.clear();
}
AbstractRoutingEngine* routingEngine;
- std::list<DBObject*> playbackQueue;
+ std::list<DBObject> playbackQueue;
uint playBackMultiplier;
std::string uuid;
bool stop;