2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #ifndef DATABASESINK_H
21 #define DATABASESINK_H
23 #include "abstractsink.h"
24 #include "abstractsource.h"
26 #include "listplusplus.h"
31 #include <unordered_map>
33 const std::string DatabaseLogging = "DatabaseLogging";
34 const std::string DatabasePlayback = "DatabasePlayback";
35 const std::string DatabaseFile = "DatabaseFile";
54 int ret = mQueue.count();
55 g_mutex_unlock(&mutex);
66 g_cond_wait(&cond, &mutex);
69 auto itr = mQueue.begin();
75 g_mutex_unlock(&mutex);
86 mQueue.push_back(item);
88 g_mutex_unlock(&mutex);
94 std::vector<T> mQueue;
103 g_mutex_init(&mutex);
113 g_mutex_lock(&mutex);
114 int ret = mQueue.count();
115 g_mutex_unlock(&mutex);
122 g_mutex_lock(&mutex);
124 while(!mQueue.size())
126 g_cond_wait(&cond, &mutex);
129 auto itr = mQueue.begin();
135 g_mutex_unlock(&mutex);
142 g_mutex_lock(&mutex);
144 g_cond_signal(&cond);
146 if(contains(mQueue, item))
148 /// remove old one. We only want the freshest of values
149 mQueue.erase(std::find(mQueue.begin(), mQueue.end(), item));
151 mQueue.push_back(item);
153 g_mutex_unlock(&mutex);
159 std::vector<T> mQueue;
164 DBObject(): zone(0), time(0), sequence(0), quit(false) {}
173 bool operator ==(const DBObject & other)
175 return (key == other.key && source == other.source && zone == other.zone);
178 bool operator != (const DBObject & other)
180 return (*this == other) == false;
197 UniqueQueue<DBObject> queue;
203 PlaybackShared(AbstractRoutingEngine* re, std::string u, uint playbackMult)
204 :routingEngine(re),uuid(u),playBackMultiplier(playbackMult),stop(false) {}
207 for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++)
212 playbackQueue.clear();
215 AbstractRoutingEngine* routingEngine;
216 std::list<DBObject> playbackQueue;
217 uint playBackMultiplier;
222 class DatabaseSink : public AbstractSource
226 DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config);
228 virtual void supportedChanged(PropertyList supportedProperties);
229 virtual void propertyChanged(AbstractPropertyType *value);
230 const std::string uuid();
233 virtual void getPropertyAsync(AsyncPropertyReply *reply);
234 virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply);
235 virtual AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request);
236 virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
237 virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
238 virtual PropertyList supported();
239 int supportedOperations() { return GetRanged | Get | Set;}
241 PropertyInfo getPropertyInfo(VehicleProperty::Property property);
248 void startPlayback();
250 void setPlayback(bool v);
251 void setLogging(bool b);
252 void setDatabaseFileName(std::string filename);
255 PropertyList mSubscriptions;
258 std::string databaseName;
259 std::string tablename;
260 std::string tablecreate;
261 std::list<VehicleProperty::Property> propertiesToSubscribeTo;
262 PropertyList mSupported;
264 PlaybackShared* playbackShared;
265 uint playbackMultiplier;
268 PROPERTYTYPEBASIC(DatabaseLogging, bool)
269 PROPERTYTYPEBASIC(DatabasePlayback, bool)
270 PROPERTYTYPE(DatabaseFile, DatabaseFileType, StringPropertyType, std::string)
273 class DatabaseSinkManager: public AbstractSinkManager
276 DatabaseSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
277 :AbstractSinkManager(engine, config)
279 new DatabaseSink(routingEngine, config);
280 VehicleProperty::registerProperty(DatabaseLogging, [](){return new DatabaseLoggingType(false);});
281 VehicleProperty::registerProperty(DatabasePlayback, [](){return new DatabasePlaybackType(false);});
282 VehicleProperty::registerProperty(DatabaseFile, [](){return new DatabaseFileType("storage");});
286 #endif // DATABASESINK_H