2 * Copyright (c) 2013, Ford Motor Company
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following
13 * disclaimer in the documentation and/or other materials provided with the
16 * Neither the name of the Ford Motor Company nor the names of its contributors
17 * may be used to endorse or promote products derived from this software
18 * without specific prior written permission.
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
33 #ifndef MESSAGE_QUEUE_CLASS
34 #define MESSAGE_QUEUE_CLASS
38 #include "utils/conditional_variable.h"
39 #include "utils/lock.h"
40 #include "utils/logger.h"
41 #include "utils/prioritized_queue.h"
45 * \brief Wrapper for multithreading queue.
48 template<typename T, class Q = std::queue<T> > class MessageQueue {
52 * \brief Default constructor
62 * \brief Returns size of the queue.
63 * \return Size of the queue.
68 * \brief If queue is empty.
69 * \return Is queue empty.
74 * \brief Tells if queue is being shut down
76 bool IsShuttingDown() const;
79 * \brief Adds element to the queue.
80 * \param element Element to be added to the queue.n
82 void push(const T& element);
85 * \brief Removes element from the queue and returns it.
86 * \return To element of the queue.
91 * \brief Conditional wait.
96 * \brief Shutdown the queue.
97 * This leads to waking up everyone waiting on the queue
98 * Queue being shut down can be drained ( with pop() )
99 * But nothing must be added to the queue after it began
109 volatile bool shutting_down_;
111 *\brief Platform specific syncronisation variable
113 mutable sync_primitives::Lock queue_lock_;
114 mutable sync_primitives::ConditionalVariable queue_new_items_;
117 template<typename T, class Q> MessageQueue<T, Q>::MessageQueue()
118 : shutting_down_(false) {
121 template<typename T, class Q> MessageQueue<T, Q>::~MessageQueue() {
122 if (!queue_.empty()) {
123 log4cxx::LoggerPtr logger =
124 log4cxx::LoggerPtr(log4cxx::Logger::getLogger("Utils"));
125 LOG4CXX_ERROR(logger, "Destruction of non-drained queue");
129 template<typename T, class Q> void MessageQueue<T, Q>::wait() {
130 sync_primitives::AutoLock auto_lock(queue_lock_);
131 while (!shutting_down_ && queue_.empty()) {
132 queue_new_items_.Wait(auto_lock);
136 template<typename T, class Q> int32_t MessageQueue<T, Q>::size() const {
137 sync_primitives::AutoLock auto_lock(queue_lock_);
138 return queue_.size();
141 template<typename T, class Q> bool MessageQueue<T, Q>::empty() const {
142 sync_primitives::AutoLock auto_lock(queue_lock_);
143 return queue_.empty();
146 template<typename T, class Q> bool MessageQueue<T, Q>::IsShuttingDown() const {
147 sync_primitives::AutoLock auto_lock(queue_lock_);
148 return shutting_down_;
151 template<typename T, class Q> void MessageQueue<T, Q>::push(const T& element) {
152 sync_primitives::AutoLock auto_lock(queue_lock_);
153 if (shutting_down_) {
154 log4cxx::LoggerPtr logger =
155 log4cxx::LoggerPtr(log4cxx::Logger::getLogger("Utils"));
156 LOG4CXX_ERROR(logger, "Runtime error, pushing into queue"
157 " that is being shut down");
159 queue_.push(element);
160 queue_new_items_.Broadcast();
163 template<typename T, class Q> T MessageQueue<T, Q>::pop() {
164 sync_primitives::AutoLock auto_lock(queue_lock_);
165 if (queue_.empty()) {
166 log4cxx::LoggerPtr logger =
167 log4cxx::LoggerPtr(log4cxx::Logger::getLogger("Utils"));
168 LOG4CXX_ERROR(logger, "Runtime error, popping out of empty que");
170 T result = queue_.front();
175 template<typename T, class Q> void MessageQueue<T, Q>::Shutdown() {
176 sync_primitives::AutoLock auto_lock(queue_lock_);
177 shutting_down_ = true;
178 queue_new_items_.Broadcast();
181 #endif // MESSAGE_QUEUE_CLASS