1 //////////////////////////////////////////////////////////////////////////////
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 // See http://www.boost.org/libs/interprocess for documentation.
9 //////////////////////////////////////////////////////////////////////////////
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
18 #include <boost/interprocess/detail/config_begin.hpp>
19 #include <boost/interprocess/detail/workaround.hpp>
21 #include <boost/interprocess/shared_memory_object.hpp>
22 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
23 #include <boost/interprocess/sync/interprocess_condition.hpp>
24 #include <boost/interprocess/sync/interprocess_mutex.hpp>
25 #include <boost/interprocess/sync/scoped_lock.hpp>
26 #include <boost/interprocess/detail/utilities.hpp>
27 #include <boost/interprocess/offset_ptr.hpp>
28 #include <boost/interprocess/creation_tags.hpp>
29 #include <boost/interprocess/exceptions.hpp>
30 #include <boost/interprocess/permissions.hpp>
31 #include <boost/detail/no_exceptions_support.hpp>
32 #include <boost/interprocess/detail/type_traits.hpp>
33 #include <boost/intrusive/pointer_traits.hpp>
34 #include <boost/type_traits/make_unsigned.hpp>
35 #include <boost/type_traits/alignment_of.hpp>
36 #include <boost/intrusive/pointer_traits.hpp>
37 #include <boost/assert.hpp>
38 #include <algorithm> //std::lower_bound
39 #include <cstddef> //std::size_t
40 #include <cstring> //memcpy
44 //!Describes an inter-process message queue. This class allows sending
45 //!messages between processes and allows blocking, non-blocking and timed
46 //!sending and receiving.
48 namespace boost{ namespace interprocess{
52 template<class VoidPointer>
53 class msg_queue_initialization_func_t;
56 //!A class that allows sending messages
58 template<class VoidPointer>
61 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
63 enum block_t { blocking, timed, non_blocking };
66 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
69 typedef VoidPointer void_pointer;
70 typedef typename boost::intrusive::
71 pointer_traits<void_pointer>::template
72 rebind_pointer<char>::type char_ptr;
73 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
74 typedef typename boost::make_unsigned<difference_type>::type size_type;
76 //!Creates a process shared message queue with name "name". For this message queue,
77 //!the maximum number of messages will be "max_num_msg" and the maximum message size
78 //!will be "max_msg_size". Throws on error and if the queue was previously created.
79 message_queue_t(create_only_t create_only,
81 size_type max_num_msg,
82 size_type max_msg_size,
83 const permissions &perm = permissions());
85 //!Opens or creates a process shared message queue with name "name".
86 //!If the queue is created, the maximum number of messages will be "max_num_msg"
87 //!and the maximum message size will be "max_msg_size". If queue was previously
88 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
89 //!are ignored. Throws on error.
90 message_queue_t(open_or_create_t open_or_create,
92 size_type max_num_msg,
93 size_type max_msg_size,
94 const permissions &perm = permissions());
96 //!Opens a previously created process shared message queue with name "name".
97 //!If the queue was not previously created or there are no free resources,
99 message_queue_t(open_only_t open_only,
102 //!Destroys *this and indicates that the calling process is finished using
103 //!the resource. All opened message queues are still
104 //!valid after destruction. The destructor function will deallocate
105 //!any system resources allocated by the system for use by this process for
106 //!this resource. The resource can still be opened again calling
107 //!the open constructor overload. To erase the message queue from the system
111 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
112 //!message queue with priority "priority". If the message queue is full
113 //!the sender is blocked. Throws interprocess_error on error.
114 void send (const void *buffer, size_type buffer_size,
115 unsigned int priority);
117 //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
118 //!message queue with priority "priority". If the message queue is full
119 //!the sender is not blocked and returns false, otherwise returns true.
120 //!Throws interprocess_error on error.
121 bool try_send (const void *buffer, size_type buffer_size,
122 unsigned int priority);
124 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
125 //!message queue with priority "priority". If the message queue is full
126 //!the sender retries until time "abs_time" is reached. Returns true if
127 //!the message has been successfully sent. Returns false if timeout is reached.
128 //!Throws interprocess_error on error.
129 bool timed_send (const void *buffer, size_type buffer_size,
130 unsigned int priority, const boost::posix_time::ptime& abs_time);
132 //!Receives a message from the message queue. The message is stored in buffer
133 //!"buffer", which has size "buffer_size". The received message has size
134 //!"recvd_size" and priority "priority". If the message queue is empty
135 //!the receiver is blocked. Throws interprocess_error on error.
136 void receive (void *buffer, size_type buffer_size,
137 size_type &recvd_size,unsigned int &priority);
139 //!Receives a message from the message queue. The message is stored in buffer
140 //!"buffer", which has size "buffer_size". The received message has size
141 //!"recvd_size" and priority "priority". If the message queue is empty
142 //!the receiver is not blocked and returns false, otherwise returns true.
143 //!Throws interprocess_error on error.
144 bool try_receive (void *buffer, size_type buffer_size,
145 size_type &recvd_size,unsigned int &priority);
147 //!Receives a message from the message queue. The message is stored in buffer
148 //!"buffer", which has size "buffer_size". The received message has size
149 //!"recvd_size" and priority "priority". If the message queue is empty
150 //!the receiver retries until time "abs_time" is reached. Returns true if
151 //!the message has been successfully sent. Returns false if timeout is reached.
152 //!Throws interprocess_error on error.
153 bool timed_receive (void *buffer, size_type buffer_size,
154 size_type &recvd_size,unsigned int &priority,
155 const boost::posix_time::ptime &abs_time);
157 //!Returns the maximum number of messages allowed by the queue. The message
158 //!queue must be opened or created previously. Otherwise, returns 0.
160 size_type get_max_msg() const;
162 //!Returns the maximum size of message allowed by the queue. The message
163 //!queue must be opened or created previously. Otherwise, returns 0.
165 size_type get_max_msg_size() const;
167 //!Returns the number of messages currently stored.
169 size_type get_num_msg() const;
171 //!Removes the message queue from the system.
172 //!Returns false on error. Never throws
173 static bool remove(const char *name);
175 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
177 typedef boost::posix_time::ptime ptime;
179 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
181 bool do_receive(block_t block,
182 void *buffer, size_type buffer_size,
183 size_type &recvd_size, unsigned int &priority,
184 const ptime &abs_time);
186 bool do_send(block_t block,
187 const void *buffer, size_type buffer_size,
188 unsigned int priority, const ptime &abs_time);
190 //!Returns the needed memory size for the shared message queue.
192 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
193 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
194 open_create_impl_t m_shmem;
195 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
198 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
200 namespace ipcdetail {
202 //!This header is the prefix of each message in the queue
203 template<class VoidPointer>
206 typedef VoidPointer void_pointer;
207 typedef typename boost::intrusive::
208 pointer_traits<void_pointer>::template
209 rebind_pointer<char>::type char_ptr;
210 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
211 typedef typename boost::make_unsigned<difference_type>::type size_type;
214 size_type len; // Message length
215 unsigned int priority;// Message priority
216 //!Returns the data buffer associated with this this message
217 void * data(){ return this+1; } //
220 //!This functor is the predicate to order stored messages by priority
221 template<class VoidPointer>
222 class priority_functor
224 typedef typename boost::intrusive::
225 pointer_traits<VoidPointer>::template
226 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
229 bool operator()(const msg_hdr_ptr_t &msg1,
230 const msg_hdr_ptr_t &msg2) const
231 { return msg1->priority < msg2->priority; }
234 //!This header is placed in the beginning of the shared memory and contains
235 //!the data to control the queue. This class initializes the shared memory
236 //!in the following way: in ascending memory address with proper alignment
240 //! Main control block that controls the rest of the elements
242 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
243 //! An array of pointers with size "max_num_msg" called index. Each pointer
244 //! points to a preallocated message. Elements of this array are
245 //! reordered in runtime in the following way:
247 //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
249 //! When the current number of messages is "cur_num_msg", the array
250 //! is treated like a circular buffer. Starting from position "cur_first_msg"
251 //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
252 //! point to free messages. Those "cur_num_msg" pointers are
253 //! ordered by the priority of the pointed message and by insertion order
254 //! if two messages have the same priority. So the next message to be
255 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
256 //! and the first free message ready to be used in a "send" operation is
257 //! [cur_first_msg] if circular buffer is extended from front,
258 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
260 //! This transforms the index in a circular buffer with an embedded free
263 //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
265 //! When the current number of messages is "cur_num_msg", the first
266 //! "cur_num_msg" pointers point to inserted messages and the rest
267 //! point to free messages. The first "cur_num_msg" pointers are
268 //! ordered by the priority of the pointed message and by insertion order
269 //! if two messages have the same priority. So the next message to be
270 //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free
271 //! message ready to be used in a "send" operation is index [cur_num_msg].
273 //! This transforms the index in a fixed size priority queue with an embedded free
276 //!-> struct message_t
278 //! msg_hdr_t header;
279 //! char[max_msg_size] data;
280 //! } messages [max_num_msg];
282 //! An array of buffers of preallocated messages, each one prefixed with the
283 //! msg_hdr_t structure. Each of this message is pointed by one pointer of
284 //! the index structure.
285 template<class VoidPointer>
287 : public ipcdetail::priority_functor<VoidPointer>
289 typedef VoidPointer void_pointer;
290 typedef msg_hdr_t<void_pointer> msg_header;
291 typedef typename boost::intrusive::
292 pointer_traits<void_pointer>::template
293 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
294 typedef typename boost::intrusive::pointer_traits
295 <msg_hdr_ptr_t>::difference_type difference_type;
296 typedef typename boost::make_unsigned<difference_type>::type size_type;
297 typedef typename boost::intrusive::
298 pointer_traits<void_pointer>::template
299 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
300 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
303 //!Constructor. This object must be constructed in the beginning of the
304 //!shared memory of the size returned by the function "get_mem_size".
305 //!This constructor initializes the needed resources and creates
306 //!the internal structures like the priority index. This can throw.
307 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
308 : m_max_num_msg(max_num_msg),
309 m_max_msg_size(max_msg_size),
311 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
313 ,m_blocked_senders(0u)
314 ,m_blocked_receivers(0u)
316 { this->initialize_memory(); }
318 //!Returns true if the message queue is full
320 { return m_cur_num_msg == m_max_num_msg; }
322 //!Returns true if the message queue is empty
323 bool is_empty() const
324 { return !m_cur_num_msg; }
326 //!Frees the top priority message and saves it in the free message list
330 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
332 typedef msg_hdr_ptr_t *iterator;
334 size_type end_pos() const
336 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
337 return space_until_bufend > m_cur_num_msg
338 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
341 //!Returns the inserted message with top priority
342 msg_header &top_msg()
344 size_type pos = this->end_pos();
345 return *mp_index[pos ? --pos : m_max_num_msg - 1];
348 //!Returns the inserted message with bottom priority
349 msg_header &bottom_msg()
350 { return *mp_index[m_cur_first_msg]; }
352 iterator inserted_ptr_begin() const
353 { return &mp_index[m_cur_first_msg]; }
355 iterator inserted_ptr_end() const
356 { return &mp_index[this->end_pos()]; }
358 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
360 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
362 iterator idx_end = &mp_index[m_max_num_msg];
363 iterator ret = std::lower_bound(begin, idx_end, value, func);
365 iterator idx_beg = &mp_index[0];
366 ret = std::lower_bound(idx_beg, end, value, func);
367 //sanity check, these cases should not call lower_bound (optimized out)
368 BOOST_ASSERT(ret != end);
369 BOOST_ASSERT(ret != begin);
377 return std::lower_bound(begin, end, value, func);
381 msg_header & insert_at(iterator where)
383 iterator it_inserted_ptr_end = this->inserted_ptr_end();
384 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
385 if(where == it_inserted_ptr_beg){
386 //unsigned integer guarantees underflow
387 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
390 return *mp_index[m_cur_first_msg];
392 else if(where == it_inserted_ptr_end){
394 return **it_inserted_ptr_end;
397 size_type pos = where - &mp_index[0];
398 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
399 //Check if it's more efficient to move back or move front
400 if(circ_pos < m_cur_num_msg/2){
401 //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
402 //indicates two step insertion
405 where = &mp_index[m_max_num_msg-1];
410 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
411 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
412 const size_type first_segment_end = pos;
413 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
414 const size_type second_segment_end = m_max_num_msg;
415 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
419 std::copy( &mp_index[0] + second_segment_beg
420 , &mp_index[0] + second_segment_end
421 , &mp_index[0] + second_segment_beg - 1);
422 mp_index[m_max_num_msg-1] = mp_index[0];
424 std::copy( &mp_index[0] + first_segment_beg
425 , &mp_index[0] + first_segment_end
426 , &mp_index[0] + first_segment_beg - 1);
428 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
434 //The queue can't be full so end_pos < m_cur_first_msg
435 //indicates two step insertion
436 const size_type pos_end = this->end_pos();
437 const bool unique_segment = pos < pos_end;
438 const size_type first_segment_beg = pos;
439 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
440 const size_type second_segment_beg = 0u;
441 const size_type second_segment_end = unique_segment ? 0u : pos_end;
442 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
446 std::copy_backward( &mp_index[0] + second_segment_beg
447 , &mp_index[0] + second_segment_end
448 , &mp_index[0] + second_segment_end + 1);
449 mp_index[0] = mp_index[m_max_num_msg-1];
451 std::copy_backward( &mp_index[0] + first_segment_beg
452 , &mp_index[0] + first_segment_end
453 , &mp_index[0] + first_segment_end + 1);
461 #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
463 typedef msg_hdr_ptr_t *iterator;
465 //!Returns the inserted message with top priority
466 msg_header &top_msg()
467 { return *mp_index[m_cur_num_msg-1]; }
469 //!Returns the inserted message with bottom priority
470 msg_header &bottom_msg()
471 { return *mp_index[0]; }
473 iterator inserted_ptr_begin() const
474 { return &mp_index[0]; }
476 iterator inserted_ptr_end() const
477 { return &mp_index[m_cur_num_msg]; }
479 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
480 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
482 msg_header & insert_at(iterator pos)
484 const msg_hdr_ptr_t backup = *inserted_ptr_end();
485 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
491 #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
493 //!Inserts the first free message in the priority queue
494 msg_header & queue_free_msg(unsigned int priority)
496 //Get priority queue's range
497 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
498 //Optimize for non-priority usage
499 if(m_cur_num_msg && priority > this->bottom_msg().priority){
500 //Check for higher priority than all stored messages
501 if(priority > this->top_msg().priority){
505 //Since we don't now which free message we will pick
506 //build a dummy header for searches
507 msg_header dummy_hdr;
508 dummy_hdr.priority = priority;
511 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
513 //Check where the free message should be placed
514 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
517 //Insert the free message in the correct position
518 return this->insert_at(it);
521 //!Returns the number of bytes needed to construct a message queue with
522 //!"max_num_size" maximum number of messages and "max_msg_size" maximum
523 //!message size. Never throws.
524 static size_type get_mem_size
525 (size_type max_msg_size, size_type max_num_msg)
528 msg_hdr_align = ::boost::alignment_of<msg_header>::value,
529 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value,
530 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
531 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
532 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
533 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
534 open_create_impl_t::ManagedOpenOrCreateUserOffset;
537 //!Initializes the memory structures to preallocate messages and constructs the
538 //!message index. Never throws.
539 void initialize_memory()
542 msg_hdr_align = ::boost::alignment_of<msg_header>::value,
543 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value,
544 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
545 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
546 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
548 //Pointer to the index
549 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*>
550 (reinterpret_cast<char*>(this)+r_hdr_size);
552 //Pointer to the first message header
553 msg_header *msg_hdr = reinterpret_cast<msg_header*>
554 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
556 //Initialize the pointer to the index
559 //Initialize the index so each slot points to a preallocated message
560 for(size_type i = 0; i < m_max_num_msg; ++i){
562 msg_hdr = reinterpret_cast<msg_header*>
563 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
568 //Pointer to the index
569 msg_hdr_ptr_ptr_t mp_index;
570 //Maximum number of messages of the queue
571 const size_type m_max_num_msg;
572 //Maximum size of messages of the queue
573 const size_type m_max_msg_size;
574 //Current number of messages
575 size_type m_cur_num_msg;
576 //Mutex to protect data structures
577 interprocess_mutex m_mutex;
578 //Condition block receivers when there are no messages
579 interprocess_condition m_cond_recv;
580 //Condition block senders when the queue is full
581 interprocess_condition m_cond_send;
582 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
583 //Current start offset in the circular index
584 size_type m_cur_first_msg;
585 size_type m_blocked_senders;
586 size_type m_blocked_receivers;
591 //!This is the atomic functor to be executed when creating or opening
592 //!shared memory. Never throws
593 template<class VoidPointer>
594 class msg_queue_initialization_func_t
597 typedef typename boost::intrusive::
598 pointer_traits<VoidPointer>::template
599 rebind_pointer<char>::type char_ptr;
600 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
601 typedef typename boost::make_unsigned<difference_type>::type size_type;
603 msg_queue_initialization_func_t(size_type maxmsg = 0,
604 size_type maxmsgsize = 0)
605 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
607 bool operator()(void *address, size_type, bool created)
612 mptr = reinterpret_cast<char*>(address);
613 //Construct the message queue header at the beginning
615 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
625 std::size_t get_min_size() const
627 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
628 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
631 const size_type m_maxmsg;
632 const size_type m_maxmsgsize;
635 } //namespace ipcdetail {
637 template<class VoidPointer>
638 inline message_queue_t<VoidPointer>::~message_queue_t()
641 template<class VoidPointer>
642 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
643 (size_type max_msg_size, size_type max_num_msg)
644 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
646 template<class VoidPointer>
647 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
649 size_type max_num_msg,
650 size_type max_msg_size,
651 const permissions &perm)
652 //Create shared memory and execute functor atomically
653 : m_shmem(create_only,
655 get_mem_size(max_msg_size, max_num_msg),
657 static_cast<void*>(0),
658 //Prepare initialization functor
659 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
663 template<class VoidPointer>
664 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
666 size_type max_num_msg,
667 size_type max_msg_size,
668 const permissions &perm)
669 //Create shared memory and execute functor atomically
670 : m_shmem(open_or_create,
672 get_mem_size(max_msg_size, max_num_msg),
674 static_cast<void*>(0),
675 //Prepare initialization functor
676 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
680 template<class VoidPointer>
681 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
682 //Create shared memory and execute functor atomically
686 static_cast<void*>(0),
687 //Prepare initialization functor
688 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
691 template<class VoidPointer>
692 inline void message_queue_t<VoidPointer>::send
693 (const void *buffer, size_type buffer_size, unsigned int priority)
694 { this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
696 template<class VoidPointer>
697 inline bool message_queue_t<VoidPointer>::try_send
698 (const void *buffer, size_type buffer_size, unsigned int priority)
699 { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
701 template<class VoidPointer>
702 inline bool message_queue_t<VoidPointer>::timed_send
703 (const void *buffer, size_type buffer_size
704 ,unsigned int priority, const boost::posix_time::ptime &abs_time)
706 if(abs_time == boost::posix_time::pos_infin){
707 this->send(buffer, buffer_size, priority);
710 return this->do_send(timed, buffer, buffer_size, priority, abs_time);
713 template<class VoidPointer>
714 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
715 const void *buffer, size_type buffer_size,
716 unsigned int priority, const boost::posix_time::ptime &abs_time)
718 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
719 //Check if buffer is smaller than maximum allowed
720 if (buffer_size > p_hdr->m_max_msg_size) {
721 throw interprocess_exception(size_error);
724 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
725 bool notify_blocked_receivers = false;
727 //---------------------------------------------
728 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
729 //---------------------------------------------
731 //If the queue is full execute blocking logic
732 if (p_hdr->is_full()) {
734 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
735 ++p_hdr->m_blocked_senders;
739 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
740 --p_hdr->m_blocked_senders;
747 p_hdr->m_cond_send.wait(lock);
749 while (p_hdr->is_full());
754 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
755 if(p_hdr->is_full()){
756 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
757 --p_hdr->m_blocked_senders;
764 while (p_hdr->is_full());
769 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
770 --p_hdr->m_blocked_senders;
774 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
775 --p_hdr->m_blocked_senders;
782 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
783 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
785 //Insert the first free message in the priority queue
786 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
788 //Sanity check, free msgs are always cleaned when received
789 BOOST_ASSERT(free_msg_hdr.priority == 0);
790 BOOST_ASSERT(free_msg_hdr.len == 0);
792 //Copy control data to the free message
793 free_msg_hdr.priority = priority;
794 free_msg_hdr.len = buffer_size;
796 //Copy user buffer to the message
797 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
800 //Notify outside lock to avoid contention. This might produce some
801 //spurious wakeups, but it's usually far better than notifying inside.
802 //If this message changes the queue empty state, notify it to receivers
803 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
804 if (notify_blocked_receivers){
805 p_hdr->m_cond_recv.notify_one();
808 p_hdr->m_cond_recv.notify_one();
814 template<class VoidPointer>
815 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
816 size_type &recvd_size, unsigned int &priority)
817 { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
819 template<class VoidPointer>
821 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
822 size_type &recvd_size, unsigned int &priority)
823 { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
825 template<class VoidPointer>
827 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
828 size_type &recvd_size, unsigned int &priority,
829 const boost::posix_time::ptime &abs_time)
831 if(abs_time == boost::posix_time::pos_infin){
832 this->receive(buffer, buffer_size, recvd_size, priority);
835 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
838 template<class VoidPointer>
840 message_queue_t<VoidPointer>::do_receive(block_t block,
841 void *buffer, size_type buffer_size,
842 size_type &recvd_size, unsigned int &priority,
843 const boost::posix_time::ptime &abs_time)
845 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
846 //Check if buffer is big enough for any message
847 if (buffer_size < p_hdr->m_max_msg_size) {
848 throw interprocess_exception(size_error);
851 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
852 bool notify_blocked_senders = false;
854 //---------------------------------------------
855 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
856 //---------------------------------------------
858 //If there are no messages execute blocking logic
859 if (p_hdr->is_empty()) {
861 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
862 ++p_hdr->m_blocked_receivers;
866 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
867 --p_hdr->m_blocked_receivers;
874 p_hdr->m_cond_recv.wait(lock);
876 while (p_hdr->is_empty());
881 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
882 if(p_hdr->is_empty()){
883 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
884 --p_hdr->m_blocked_receivers;
891 while (p_hdr->is_empty());
898 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
899 --p_hdr->m_blocked_receivers;
903 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
904 --p_hdr->m_blocked_receivers;
911 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
912 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
915 //There is at least one message ready to pick, get the top one
916 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
918 //Get data from the message
919 recvd_size = top_msg.len;
920 priority = top_msg.priority;
922 //Some cleanup to ease debugging
924 top_msg.priority = 0;
926 //Copy data to receiver's bufers
927 std::memcpy(buffer, top_msg.data(), recvd_size);
929 //Free top message and put it in the free message list
930 p_hdr->free_top_msg();
933 //Notify outside lock to avoid contention. This might produce some
934 //spurious wakeups, but it's usually far better than notifying inside.
935 //If this reception changes the queue full state, notify senders
936 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
937 if (notify_blocked_senders){
938 p_hdr->m_cond_send.notify_one();
941 p_hdr->m_cond_send.notify_one();
947 template<class VoidPointer>
948 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
950 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
951 return p_hdr ? p_hdr->m_max_num_msg : 0; }
953 template<class VoidPointer>
954 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
956 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
957 return p_hdr ? p_hdr->m_max_msg_size : 0;
960 template<class VoidPointer>
961 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
963 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
965 //---------------------------------------------
966 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
967 //---------------------------------------------
968 return p_hdr->m_cur_num_msg;
974 template<class VoidPointer>
975 inline bool message_queue_t<VoidPointer>::remove(const char *name)
976 { return shared_memory_object::remove(name); }
980 //!Typedef for a default message queue
981 //!to be used between processes
982 typedef message_queue_t<offset_ptr<void> > message_queue;
984 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
986 }} //namespace boost{ namespace interprocess{
988 #include <boost/interprocess/detail/config_end.hpp>
990 #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP