made some of the dbus API compliant with w3c auto-bg specification
[profile/ivi/automotive-message-broker.git] / lib / asyncqueue.hpp
1 /*
2         Copyright (C) 2014  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <glib.h>
20
21 #include <abstractpropertytype.h>
22
23 #include <mutex>
24 #include <unordered_set>
25
26 namespace amb
27 {
28
29 template <typename T, class Pred = std::equal_to<T> >
30 class Queue
31 {
32 public:
33         Queue()
34         {
35
36         }
37         virtual ~Queue()
38         {
39
40         }
41
42         int count()
43         {
44                 std::lock_guard<std::mutex> lock(mutex);
45
46                 return mQueue.size();
47         }
48
49         T pop()
50         {
51                 std::lock_guard<std::mutex> lock(mutex);
52
53                 auto itr = mQueue.begin();
54
55                 T item = *itr;
56
57                 mQueue.erase(itr);
58
59                 return item;
60         }
61
62         virtual void append(T item)
63         {
64                 std::lock_guard<std::mutex> lock(mutex);
65
66                 mQueue.insert(item);
67         }
68
69 protected:
70         std::mutex mutex;
71         std::unordered_set<T,std::hash<T>, Pred> mQueue;
72 };
73
74 template <typename T, class Pred = std::equal_to<T> >
75 struct AsyncQueueSource{
76         GSource source;
77         Queue<T, Pred>* queue;
78         int minQueueSize;
79 };
80
81 template <typename T, class Pred = std::equal_to<T> >
82 class AsyncQueueWatcher
83 {
84 public:
85         typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
86         AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
87                 : callback(cb), mMaxQueueSize(queueSize)
88         {
89
90                 static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
91                 GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
92
93                 AsyncQueueSource<T, Pred>* watch = (AsyncQueueSource<T, Pred>*)source;
94                 watch->queue = queue;
95                 watch->minQueueSize = queueSize;
96
97                 gint p = G_PRIORITY_DEFAULT;
98
99                 if(priority == AbstractPropertyType::Normal)
100                         p = G_PRIORITY_DEFAULT;
101                 else if(priority == AbstractPropertyType::High)
102                         p = G_PRIORITY_HIGH;
103                 else if(priority == AbstractPropertyType::Low)
104                         p = G_PRIORITY_LOW;
105
106                 g_source_set_priority(source, p);
107                 g_source_set_callback(source, nullptr, this, nullptr);
108
109                 g_source_attach(source, nullptr);
110                 g_source_unref(source);
111         }
112
113         AsyncQueueWatcherCallback callback;
114
115
116 protected:
117         AsyncQueueWatcher(){}
118
119         int mMaxQueueSize;
120
121 private:
122
123         static gboolean prepare(GSource *source, gint *timeout)
124         {
125                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
126                 *timeout = -1;
127
128                 if (!s)
129                         return false;
130
131                 return s->queue->count() > s->minQueueSize;
132         }
133
134         static gboolean check(GSource *source)
135         {
136                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
137
138                 if (!s)
139                         return false;
140
141                 return s->queue->count() > s->minQueueSize;
142         }
143
144         static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
145         {
146                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
147
148                 if (!s)
149                         return false;
150
151                 AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
152
153                 watcher->callback(s->queue);
154                 return true;
155         }
156
157         static void finalize(GSource* source)
158         {
159
160         }
161 };
162 }  // namespace amb