Merge pull request #67 from tripzero/trip
[profile/ivi/automotive-message-broker.git] / lib / asyncqueue.hpp
1 /*
2         Copyright (C) 2014  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <glib.h>
20
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
23
24 #include <mutex>
25 #include <condition_variable>
26 #include <unordered_set>
27 #include <vector>
28
29 namespace amb
30 {
31
32 template <typename T, class Pred = std::equal_to<T> >
33 class Queue
34 {
35 public:
36         Queue(bool unique = false, bool blocking = false)
37                 :mUnique(unique), mBlocking(blocking)
38         {
39
40         }
41
42         virtual ~Queue()
43         {
44
45         }
46
47         int count()
48         {
49                 std::lock_guard<std::mutex> lock(mutex);
50
51                 return mQueue.size();
52         }
53
54         T pop()
55         {
56                 std::unique_lock<std::mutex> lock(mutex);
57
58                 if(mBlocking)
59                 {
60                         if(!mQueue.size())
61                         {
62                                 cond.wait(lock);
63                         }
64                 }
65
66                 if(!mQueue.size())
67                 {
68                         throw std::runtime_error("nothing in queue");
69                 }
70
71                 auto itr = mQueue.begin();
72
73                 T item = *itr;
74
75                 mQueue.erase(itr);
76
77                 return item;
78         }
79
80         virtual void append(T item)
81         {
82                 {
83                         std::lock_guard<std::mutex> lock(mutex);
84
85                         auto i = mQueue.begin();
86                         while(true)
87                         {
88                                 i = find(i, mQueue.end(), item);
89                                 if (i == mQueue.end())
90                                         break;
91                                 i = mQueue.erase(i);
92                         }
93
94                         mQueue.push_back(item);
95                 }
96
97                 if(mBlocking)
98                 {
99                         cond.notify_all();
100                 }
101         }
102
103         void remove(T item)
104         {
105                 std::lock_guard<std::mutex> lock(mutex);
106                 removeOne(&mQueue, item);
107         }
108
109 private:
110         bool mBlocking;
111         bool mUnique;
112         std::mutex mutex;
113         std::condition_variable cond;
114         std::vector<T> mQueue;
115 };
116
117 template <typename T, class Pred = std::equal_to<T> >
118 struct AsyncQueueSource{
119         GSource source;
120         Queue<T, Pred>* queue;
121         int minQueueSize;
122 };
123
124 template <typename T, class Pred = std::equal_to<T> >
125 class AsyncQueueWatcher
126 {
127 public:
128         typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
129         AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
130                 : callback(cb), mMaxQueueSize(queueSize)
131         {
132
133                 static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
134                 GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
135
136                 AsyncQueueSource<T, Pred>* watch = (AsyncQueueSource<T, Pred>*)source;
137                 watch->queue = queue;
138                 watch->minQueueSize = queueSize;
139
140                 gint p = G_PRIORITY_DEFAULT;
141
142                 if(priority == AbstractPropertyType::Normal)
143                         p = G_PRIORITY_DEFAULT;
144                 else if(priority == AbstractPropertyType::High)
145                         p = G_PRIORITY_HIGH;
146                 else if(priority == AbstractPropertyType::Low)
147                         p = G_PRIORITY_LOW;
148
149                 g_source_set_priority(source, p);
150                 g_source_set_callback(source, nullptr, this, nullptr);
151
152                 g_source_attach(source, nullptr);
153                 g_source_unref(source);
154         }
155
156         AsyncQueueWatcherCallback callback;
157
158
159 protected:
160         AsyncQueueWatcher(){}
161
162         int mMaxQueueSize;
163
164 private:
165
166         static gboolean prepare(GSource *source, gint *timeout)
167         {
168                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
169                 *timeout = -1;
170
171                 if (!s)
172                         return false;
173
174                 return s->queue->count() > s->minQueueSize;
175         }
176
177         static gboolean check(GSource *source)
178         {
179                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
180
181                 if (!s)
182                         return false;
183
184                 return s->queue->count() > s->minQueueSize;
185         }
186
187         static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
188         {
189                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
190
191                 if (!s)
192                         return false;
193
194                 AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
195
196                 watcher->callback(s->queue);
197                 return true;
198         }
199
200         static void finalize(GSource* source)
201         {
202
203         }
204 };
205 }  // namespace amb