09f62c64869338a2c3dd083796bea2edfe5edb01
[profile/ivi/automotive-message-broker.git] / plugins / database / databasesink.h
1 /*
2     Copyright (C) 2012  Intel Corporation
3
4     This library is free software; you can redistribute it and/or
5     modify it under the terms of the GNU Lesser General Public
6     License as published by the Free Software Foundation; either
7     version 2.1 of the License, or (at your option) any later version.
8
9     This library is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12     Lesser General Public License for more details.
13
14     You should have received a copy of the GNU Lesser General Public
15     License along with this library; if not, write to the Free Software
16     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #ifndef DATABASESINK_H
21 #define DATABASESINK_H
22
23 #include "abstractsink.h"
24 #include "basedb.hpp"
25
26 #include <glib.h>
27
28 template <typename T>
29 class Queue
30 {
31 public:
32         Queue()
33         {
34                 mutex = g_mutex_new();
35         }
36
37         int count()
38         {
39                 g_mutex_lock(mutex);
40                 int ret = mQueue.count();
41                 g_mutex_unlock(mutex);
42
43                 return ret;
44         }
45
46         T pop()
47         {
48                 g_mutex_lock(mutex);
49
50                 while(!mQueue.size())
51                 {
52                         g_cond_wait(&cond, mutex);
53                 }
54
55                 auto itr = mQueue.begin();
56
57                 T item = *itr;
58
59                 mQueue.erase(itr);
60
61                 g_mutex_unlock(mutex);
62
63                 return item;
64         }
65
66         void append(T item)
67         {
68                 g_mutex_lock(mutex);
69
70                 g_cond_signal(&cond);
71
72                 mQueue.append(item);
73
74                 g_mutex_unlock(mutex);
75         }
76
77 private:
78         GMutex * mutex;
79         GCond cond;
80         std::vector<T> mQueue;
81 };
82
83 class DBObject {
84 public:
85         DBObject(): time(0),quit(false) {}
86         std::string key;
87         std::string value;
88         std::string source;
89         double time;
90         uint32_t sequence;
91         bool quit;
92 };
93
94 class Shared
95 {
96 public:
97         Shared()
98         {
99                 db = new BaseDB;
100         }
101
102         BaseDB * db;
103         Queue<DBObject*> queue;
104 };
105
106 class DatabaseSink : public AbstractSink
107 {
108
109 public:
110         DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config);
111         ~DatabaseSink();
112         virtual PropertyList subscriptions();
113         virtual void supportedChanged(PropertyList supportedProperties);
114         virtual void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid, double timestamp, uint32_t sequence);
115         virtual std::string uuid();
116
117 private:
118         PropertyList mSubscriptions;
119         Shared *shared;
120         GThread* thread;
121
122 };
123
124 class DatabaseSinkManager: public AbstractSinkManager
125 {
126 public:
127         DatabaseSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
128         :AbstractSinkManager(engine, config)
129         {
130                 new DatabaseSink(routingEngine, config);
131         }
132 };
133
134 #endif // DATABASESINK_H