added frequency option to database plugin. fixed up zone a bit.
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.h
1 /*
2     Copyright (C) 2012  Intel Corporation
3
4     This library is free software; you can redistribute it and/or
5     modify it under the terms of the GNU Lesser General Public
6     License as published by the Free Software Foundation; either
7     version 2.1 of the License, or (at your option) any later version.
8
9     This library is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12     Lesser General Public License for more details.
13
14     You should have received a copy of the GNU Lesser General Public
15     License along with this library; if not, write to the Free Software
16     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #ifndef DATABASESINK_H
21 #define DATABASESINK_H
22
23 #include "abstractsink.h"
24 #include "abstractsource.h"
25 #include "basedb.hpp"
26 #include "listplusplus.h"
27
28 #include <glib.h>
29
30 #include <functional>
31 #include <unordered_map>
32
33 const std::string DatabaseLogging = "DatabaseLogging";
34 const std::string DatabasePlayback = "DatabasePlayback";
35 const std::string DatabaseFile = "DatabaseFile";
36
37 template <typename T>
38 class Queue
39 {
40 public:
41         Queue()
42         {
43                 g_mutex_init(&mutex);
44                 g_cond_init(&cond);
45         }
46         ~Queue()
47         {
48
49         }
50
51         int count()
52         {
53                 g_mutex_lock(&mutex);
54                 int ret = mQueue.count();
55                 g_mutex_unlock(&mutex);
56
57                 return ret;
58         }
59
60         T pop()
61         {
62                 g_mutex_lock(&mutex);
63
64                 while(!mQueue.size())
65                 {
66                         g_cond_wait(&cond, &mutex);
67                 }
68
69                 auto itr = mQueue.begin();
70
71                 T item = *itr;
72
73                 mQueue.erase(itr);
74
75                 g_mutex_unlock(&mutex);
76
77                 return item;
78         }
79
80         void append(T item)
81         {
82                 g_mutex_lock(&mutex);
83
84                 g_cond_signal(&cond);
85
86                 mQueue.push_back(item);
87
88                 g_mutex_unlock(&mutex);
89         }
90
91 private:
92         GMutex mutex;
93         GCond cond;
94         std::vector<T> mQueue;
95 };
96
97 template <typename T>
98 class UniqueQueue
99 {
100 public:
101         UniqueQueue()
102         {
103                 g_mutex_init(&mutex);
104                 g_cond_init(&cond);
105         }
106         ~UniqueQueue()
107         {
108
109         }
110
111         int count()
112         {
113                 g_mutex_lock(&mutex);
114                 int ret = mQueue.count();
115                 g_mutex_unlock(&mutex);
116
117                 return ret;
118         }
119
120         T pop()
121         {
122                 g_mutex_lock(&mutex);
123
124                 while(!mQueue.size())
125                 {
126                         g_cond_wait(&cond, &mutex);
127                 }
128
129                 auto itr = mQueue.begin();
130
131                 T item = (*itr);
132
133                 mQueue.erase(itr);
134
135                 g_mutex_unlock(&mutex);
136
137                 return item;
138         }
139
140         void append(T item)
141         {
142                 g_mutex_lock(&mutex);
143
144                 g_cond_signal(&cond);
145
146                 if(contains(mQueue, item))
147                 {
148                         /// remove old one.  We only want the freshest of values
149                         mQueue.erase(std::find(mQueue.begin(), mQueue.end(), item));
150                 }
151                 mQueue.push_back(item);
152
153                 g_mutex_unlock(&mutex);
154         }
155
156 private:
157         GMutex mutex;
158         GCond cond;
159         std::vector<T> mQueue;
160 };
161
162 class DBObject {
163 public:
164         DBObject(): zone(0), time(0), sequence(0), quit(false) {}
165         std::string key;
166         std::string value;
167         std::string source;
168         int32_t zone;
169         double time;
170         int32_t sequence;
171         bool quit;
172
173         bool operator ==(const DBObject & other)
174         {
175                 return (key == other.key && source == other.source && zone == other.zone);
176         }
177
178         bool operator != (const DBObject & other)
179         {
180                 return (*this == other) == false;
181         }
182 };
183
184 class Shared
185 {
186 public:
187         Shared()
188         {
189                 db = new BaseDB;
190         }
191         ~Shared()
192         {
193                 delete db;
194         }
195
196         BaseDB * db;
197         UniqueQueue<DBObject> queue;
198 };
199
200 class PlaybackShared
201 {
202 public:
203         PlaybackShared(AbstractRoutingEngine* re, std::string u, uint playbackMult)
204                 :routingEngine(re),uuid(u),playBackMultiplier(playbackMult),stop(false) {}
205         ~PlaybackShared()
206         {
207                 for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++)
208                 {
209                         DBObject obj = *itr;
210                 }
211
212                 playbackQueue.clear();
213         }
214
215         AbstractRoutingEngine* routingEngine;
216         std::list<DBObject> playbackQueue;
217         uint playBackMultiplier;
218         std::string uuid;
219         bool stop;
220 };
221
222 class DatabaseSink : public AbstractSource
223 {
224
225 public:
226         DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config);
227         ~DatabaseSink();
228         virtual void supportedChanged(PropertyList supportedProperties);
229         virtual void propertyChanged(AbstractPropertyType *value);
230         const std::string uuid();
231
232         ///source role:
233         virtual void getPropertyAsync(AsyncPropertyReply *reply);
234         virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply);
235         virtual AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request);
236         virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
237         virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
238         virtual PropertyList supported();
239         int supportedOperations() { return GetRanged | Get | Set;}
240
241         PropertyInfo getPropertyInfo(VehicleProperty::Property property);
242
243 private: //methods:
244
245         void parseConfig();
246         void stopDb();
247         void startDb();
248         void startPlayback();
249         void initDb();
250         void setPlayback(bool v);
251         void setLogging(bool b);
252         void setDatabaseFileName(std::string filename);
253
254 private:
255         PropertyList mSubscriptions;
256         Shared *shared;
257         GThread* thread;
258         std::string databaseName;
259         std::string tablename;
260         std::string tablecreate;
261         std::list<VehicleProperty::Property> propertiesToSubscribeTo;
262         PropertyList mSupported;
263         bool playback;
264         PlaybackShared* playbackShared;
265         uint playbackMultiplier;
266 };
267
268 PROPERTYTYPEBASIC(DatabaseLogging, bool)
269 PROPERTYTYPEBASIC(DatabasePlayback, bool)
270 PROPERTYTYPE(DatabaseFile, DatabaseFileType, StringPropertyType, std::string)
271
272
273 class DatabaseSinkManager: public AbstractSinkManager
274 {
275 public:
276         DatabaseSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
277         :AbstractSinkManager(engine, config)
278         {
279                 new DatabaseSink(routingEngine, config);
280                 VehicleProperty::registerProperty(DatabaseLogging, [](){return new DatabaseLoggingType(false);});
281                 VehicleProperty::registerProperty(DatabasePlayback, [](){return new DatabasePlaybackType(false);});
282                 VehicleProperty::registerProperty(DatabaseFile, [](){return new DatabaseFileType("storage");});
283         }
284 };
285
286 #endif // DATABASESINK_H