2 Copyright (C) 2014 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
25 #include <condition_variable>
26 #include <unordered_set>
32 template <typename T, class Pred = std::equal_to<T> >
36 Queue(bool unique = false, bool blocking = false)
37 :mUnique(unique), mBlocking(blocking)
49 std::lock_guard<std::mutex> lock(mutex);
56 std::unique_lock<std::mutex> lock(mutex);
68 throw std::runtime_error("nothing in queue");
71 auto itr = mQueue.begin();
80 virtual void append(T item)
83 std::lock_guard<std::mutex> lock(mutex);
85 auto i = mQueue.begin();
88 i = find(i, mQueue.end(), item);
89 if (i == mQueue.end())
94 mQueue.push_back(item);
105 std::lock_guard<std::mutex> lock(mutex);
106 removeOne(&mQueue, item);
113 std::condition_variable cond;
114 std::vector<T> mQueue;
117 template <typename T, class Pred = std::equal_to<T> >
118 struct AsyncQueueSource{
120 Queue<T, Pred>* queue;
124 template <typename T, class Pred = std::equal_to<T> >
125 class AsyncQueueWatcher
128 typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
129 AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
130 : callback(cb), mMaxQueueSize(queueSize)
133 static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
134 GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
136 AsyncQueueSource<T, Pred>* watch = (AsyncQueueSource<T, Pred>*)source;
137 watch->queue = queue;
138 watch->minQueueSize = queueSize;
140 gint p = G_PRIORITY_DEFAULT;
142 if(priority == AbstractPropertyType::Normal)
143 p = G_PRIORITY_DEFAULT;
144 else if(priority == AbstractPropertyType::High)
146 else if(priority == AbstractPropertyType::Low)
149 g_source_set_priority(source, p);
150 g_source_set_callback(source, nullptr, this, nullptr);
152 g_source_attach(source, nullptr);
153 g_source_unref(source);
156 AsyncQueueWatcherCallback callback;
160 AsyncQueueWatcher(){}
166 static gboolean prepare(GSource *source, gint *timeout)
168 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
174 return s->queue->count() > s->minQueueSize;
177 static gboolean check(GSource *source)
179 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
184 return s->queue->count() > s->minQueueSize;
187 static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
189 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
194 AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
196 watcher->callback(s->queue);
200 static void finalize(GSource* source)