2 Copyright (C) 2014 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
25 #include <condition_variable>
26 #include <unordered_set>
31 template <typename T, class Pred = std::equal_to<T> >
35 Queue(bool blocking = false)
48 std::lock_guard<std::mutex> lock(mutex);
55 std::unique_lock<std::mutex> lock(mutex);
66 throw std::runtime_error("nothing in queue");
68 auto itr = mQueue.begin();
77 virtual void append(T item)
80 std::lock_guard<std::mutex> lock(mutex);
92 std::lock_guard<std::mutex> lock(mutex);
93 removeOne(&mQueue, item);
99 std::condition_variable cond;
100 std::unordered_set<T, std::hash<T>, Pred> mQueue;
103 template <typename T, class Pred = std::equal_to<T> >
104 struct AsyncQueueSource{
106 Queue<T, Pred>* queue;
110 template <typename T, class Pred = std::equal_to<T> >
111 class AsyncQueueWatcher
114 typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
115 AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
116 : callback(cb), mMaxQueueSize(queueSize)
119 static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
120 GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
122 AsyncQueueSource<T, Pred>* watch = (AsyncQueueSource<T, Pred>*)source;
123 watch->queue = queue;
124 watch->minQueueSize = queueSize;
126 gint p = G_PRIORITY_DEFAULT;
128 if(priority == AbstractPropertyType::Normal)
129 p = G_PRIORITY_DEFAULT;
130 else if(priority == AbstractPropertyType::High)
132 else if(priority == AbstractPropertyType::Low)
135 g_source_set_priority(source, p);
136 g_source_set_callback(source, nullptr, this, nullptr);
138 g_source_attach(source, nullptr);
139 g_source_unref(source);
142 AsyncQueueWatcherCallback callback;
146 AsyncQueueWatcher(){}
152 static gboolean prepare(GSource *source, gint *timeout)
154 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
160 return s->queue->count() > s->minQueueSize;
163 static gboolean check(GSource *source)
165 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
170 return s->queue->count() > s->minQueueSize;
173 static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
175 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
180 AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
182 watcher->callback(s->queue);
186 static void finalize(GSource* source)