2 Copyright (C) 2014 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <abstractpropertytype.h>
24 #include <unordered_set>
29 template <typename T, class Pred = std::equal_to<T> >
44 std::lock_guard<std::mutex> lock(mutex);
51 std::lock_guard<std::mutex> lock(mutex);
53 auto itr = mQueue.begin();
62 virtual void append(T item)
64 std::lock_guard<std::mutex> lock(mutex);
71 std::unordered_set<T,std::hash<T>, Pred> mQueue;
74 template <typename T, class Pred = std::equal_to<T> >
75 struct AsyncQueueSource{
77 Queue<T, Pred>* queue;
81 template <typename T, class Pred = std::equal_to<T> >
82 class AsyncQueueWatcher
85 typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
86 AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
87 : callback(cb), mMaxQueueSize(queueSize)
90 static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
91 GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
93 AsyncQueueSource<T, Pred>* watch = (AsyncQueueSource<T, Pred>*)source;
95 watch->minQueueSize = queueSize;
97 gint p = G_PRIORITY_DEFAULT;
99 if(priority == AbstractPropertyType::Normal)
100 p = G_PRIORITY_DEFAULT;
101 else if(priority == AbstractPropertyType::High)
103 else if(priority == AbstractPropertyType::Low)
106 g_source_set_priority(source, p);
107 g_source_set_callback(source, nullptr, this, nullptr);
109 g_source_attach(source, nullptr);
110 g_source_unref(source);
113 AsyncQueueWatcherCallback callback;
117 AsyncQueueWatcher(){}
123 static gboolean prepare(GSource *source, gint *timeout)
125 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
131 return s->queue->count() > s->minQueueSize;
134 static gboolean check(GSource *source)
136 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
141 return s->queue->count() > s->minQueueSize;
144 static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
146 AsyncQueueSource<T, Pred>* s = (AsyncQueueSource<T, Pred>*)source;
151 AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
153 watcher->callback(s->queue);
157 static void finalize(GSource* source)