[websocket] fixed getRanged requests
[profile/ivi/automotive-message-broker.git] / lib / asyncqueue.hpp
1 /*
2         Copyright (C) 2014  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <glib.h>
20
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
23
24 #include <mutex>
25 #include <condition_variable>
26 #include <unordered_set>
27
28 namespace amb
29 {
30
31 template <typename T, class Pred = std::equal_to<T> >
32 class Queue
33 {
34 public:
35         Queue(bool blocking = false)
36                 :mBlocking(blocking)
37         {
38
39         }
40
41         virtual ~Queue()
42         {
43
44         }
45
46         int count()
47         {
48                 std::lock_guard<std::mutex> lock(mutex);
49
50                 return mQueue.size();
51         }
52
53         T pop()
54         {
55                 std::unique_lock<std::mutex> lock(mutex);
56
57                 if(mBlocking)
58                 {
59                         if(!mQueue.size())
60                         {
61                                 cond.wait(lock);
62                         }
63                 }
64
65                 if(!mQueue.size())
66                         throw std::runtime_error("nothing in queue");
67
68                 auto itr = mQueue.begin();
69
70                 T item = *itr;
71
72                 mQueue.erase(itr);
73
74                 return item;
75         }
76
77         virtual void append(T item)
78         {
79                 {
80                         std::lock_guard<std::mutex> lock(mutex);
81                         mQueue.insert(item);
82                 }
83
84                 if(mBlocking)
85                 {
86                         cond.notify_all();
87                 }
88         }
89
90         void remove(T item)
91         {
92                 std::lock_guard<std::mutex> lock(mutex);
93                 removeOne(&mQueue, item);
94         }
95
96 protected:
97         bool mBlocking;
98         std::mutex mutex;
99         std::condition_variable cond;
100         std::unordered_set<T, std::hash<T>, Pred> mQueue;
101 };
102
103 template <typename T, class Pred = std::equal_to<T> >
104 struct AsyncQueueSource{
105         GSource source;
106         Queue<T, Pred>* queue;
107         int minQueueSize;
108 };
109
110 template <typename T, class Pred = std::equal_to<T> >
111 class AsyncQueueWatcher
112 {
113 public:
114         typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
115         AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
116                 : callback(cb), mMaxQueueSize(queueSize)
117         {
118
119                 static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
120                 GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
121
122                 AsyncQueueSource<T, Pred>* watch = (AsyncQueueSource<T, Pred>*)source;
123                 watch->queue = queue;
124                 watch->minQueueSize = queueSize;
125
126                 gint p = G_PRIORITY_DEFAULT;
127
128                 if(priority == AbstractPropertyType::Normal)
129                         p = G_PRIORITY_DEFAULT;
130                 else if(priority == AbstractPropertyType::High)
131                         p = G_PRIORITY_HIGH;
132                 else if(priority == AbstractPropertyType::Low)
133                         p = G_PRIORITY_LOW;
134
135                 g_source_set_priority(source, p);
136                 g_source_set_callback(source, nullptr, this, nullptr);
137
138                 g_source_attach(source, nullptr);
139                 g_source_unref(source);
140         }
141
142         AsyncQueueWatcherCallback callback;
143
144
145 protected:
146         AsyncQueueWatcher(){}
147
148         int mMaxQueueSize;
149
150 private:
151
152         static gboolean prepare(GSource *source, gint *timeout)
153         {
154                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
155                 *timeout = -1;
156
157                 if (!s)
158                         return false;
159
160                 return s->queue->count() > s->minQueueSize;
161         }
162
163         static gboolean check(GSource *source)
164         {
165                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
166
167                 if (!s)
168                         return false;
169
170                 return s->queue->count() > s->minQueueSize;
171         }
172
173         static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
174         {
175                 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
176
177                 if (!s)
178                         return false;
179
180                 AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
181
182                 watcher->callback(s->queue);
183                 return true;
184         }
185
186         static void finalize(GSource* source)
187         {
188
189         }
190 };
191 }  // namespace amb