template <class DerivedService, class Event>
class ServiceThreadDispatcher {
std::mutex m_eventQueueMutex;
+ std::mutex m_hi_prio_eventQueueMutex;
std::condition_variable m_waitCondition;
+ std::condition_variable m_hi_prio_waitCondition;
bool m_quit = false;
std::queue<Event *> m_eventQueues[Priority::END];
- std::thread m_thread; // initialized last
+ std::thread m_thread;
+ std::thread m_hi_prio_thread; // only for Priority::HIGHEST
public:
- ServiceThreadDispatcher() : m_thread(&ServiceThreadDispatcher::ThreadLoop, this) {}
+ ServiceThreadDispatcher() : m_thread(&ServiceThreadDispatcher::ThreadLoop, this),
+ m_hi_prio_thread(&ServiceThreadDispatcher::HiPrioThreadLoop, this) {}
~ServiceThreadDispatcher() {
{
std::lock_guard<std::mutex> lock(m_eventQueueMutex);
+ std::lock_guard<std::mutex> lockHiPrio(m_hi_prio_eventQueueMutex);
m_quit = true;
}
m_waitCondition.notify_one();
+ m_hi_prio_waitCondition.notify_one();
m_thread.join();
-
+ m_hi_prio_thread.join();
// clear the event queue
for (auto &queue: m_eventQueues)
while (!queue.empty()) {
void PutEvent(Priority priority, T&&...arg) {
assert(priority < arraySize(m_eventQueues));
const auto event = new Event{ std::forward<T>(arg)... };
+ if (priority == Priority::HIGHEST)
{
- std::lock_guard<std::mutex> lock(m_eventQueueMutex);
- m_eventQueues[priority].emplace(event);
+ {
+ std::lock_guard<std::mutex> lock(m_hi_prio_eventQueueMutex);
+ m_eventQueues[priority].emplace(event);
+ }
+ m_hi_prio_waitCondition.notify_one();
+ } else
+ {
+ {
+ std::lock_guard<std::mutex> lock(m_eventQueueMutex);
+ m_eventQueues[priority].emplace(event);
+ }
+ m_waitCondition.notify_one();
}
- m_waitCondition.notify_one();
}
private:
for (;;) {
if (m_quit)
return;
- for (auto &queue: m_eventQueues)
+ for (int i = Priority::HIGH; i < Priority::END; ++i)
+ {
+ auto &queue = m_eventQueues[i];
if (!queue.empty()) {
event = queue.front();
queue.pop();
goto handleOneEvent;
}
+ }
m_waitCondition.wait(ulock);
}
}
UNHANDLED_EXCEPTION_HANDLER_END
}
}
+
+ // Only high priority
+ void HiPrioThreadLoop() {
+ for (;;) {
+ Event *event;
+ {
+ std::unique_lock<std::mutex> ulock(m_hi_prio_eventQueueMutex);
+ for (;;) {
+ if (m_quit)
+ return;
+ auto &queue = m_eventQueues[Priority::HIGHEST];
+ if (!queue.empty()) {
+ event = queue.front();
+ queue.pop();
+ goto handleOneHiPrioEvent;
+ }
+ m_hi_prio_waitCondition.wait(ulock);
+ }
+ }
+
+ handleOneHiPrioEvent:
+ UNHANDLED_EXCEPTION_HANDLER_BEGIN
+ {
+ const auto eventGuard = makeUnique(event);
+ static_cast<DerivedService*>(this)->processEvent(std::move(*event));
+ }
+ UNHANDLED_EXCEPTION_HANDLER_END
+ }
+ }
};
} // namespace SecurityManager