Imported Upstream version 1.57.0
[platform/upstream/boost.git] / boost / interprocess / ipc / message_queue.hpp
1 //////////////////////////////////////////////////////////////////////////////
2 //
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // See http://www.boost.org/libs/interprocess for documentation.
8 //
9 //////////////////////////////////////////////////////////////////////////////
10
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
13
14 #if defined(_MSC_VER)
15 #  pragma once
16 #endif
17
18 #include <boost/interprocess/detail/config_begin.hpp>
19 #include <boost/interprocess/detail/workaround.hpp>
20
21 #include <boost/interprocess/shared_memory_object.hpp>
22 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
23 #include <boost/interprocess/sync/interprocess_condition.hpp>
24 #include <boost/interprocess/sync/interprocess_mutex.hpp>
25 #include <boost/interprocess/sync/scoped_lock.hpp>
26 #include <boost/interprocess/detail/utilities.hpp>
27 #include <boost/interprocess/offset_ptr.hpp>
28 #include <boost/interprocess/creation_tags.hpp>
29 #include <boost/interprocess/exceptions.hpp>
30 #include <boost/interprocess/permissions.hpp>
31 #include <boost/detail/no_exceptions_support.hpp>
32 #include <boost/interprocess/detail/type_traits.hpp>
33 #include <boost/intrusive/pointer_traits.hpp>
34 #include <boost/type_traits/make_unsigned.hpp>
35 #include <boost/type_traits/alignment_of.hpp>
36 #include <boost/intrusive/pointer_traits.hpp>
37 #include <boost/assert.hpp>
38 #include <algorithm> //std::lower_bound
39 #include <cstddef>   //std::size_t
40 #include <cstring>   //memcpy
41
42
43 //!\file
44 //!Describes an inter-process message queue. This class allows sending
45 //!messages between processes and allows blocking, non-blocking and timed
46 //!sending and receiving.
47
48 namespace boost{  namespace interprocess{
49
50 namespace ipcdetail
51 {
52    template<class VoidPointer>
53    class msg_queue_initialization_func_t;
54 }
55
56 //!A class that allows sending messages
57 //!between processes.
58 template<class VoidPointer>
59 class message_queue_t
60 {
61    #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
62    //Blocking modes
63    enum block_t   {  blocking,   timed,   non_blocking   };
64
65    message_queue_t();
66    #endif   //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
67
68    public:
69    typedef VoidPointer                                                 void_pointer;
70    typedef typename boost::intrusive::
71       pointer_traits<void_pointer>::template
72          rebind_pointer<char>::type                                    char_ptr;
73    typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
74    typedef typename boost::make_unsigned<difference_type>::type        size_type;
75
76    //!Creates a process shared message queue with name "name". For this message queue,
77    //!the maximum number of messages will be "max_num_msg" and the maximum message size
78    //!will be "max_msg_size". Throws on error and if the queue was previously created.
79    message_queue_t(create_only_t create_only,
80                  const char *name,
81                  size_type max_num_msg,
82                  size_type max_msg_size,
83                  const permissions &perm = permissions());
84
85    //!Opens or creates a process shared message queue with name "name".
86    //!If the queue is created, the maximum number of messages will be "max_num_msg"
87    //!and the maximum message size will be "max_msg_size". If queue was previously
88    //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
89    //!are ignored. Throws on error.
90    message_queue_t(open_or_create_t open_or_create,
91                  const char *name,
92                  size_type max_num_msg,
93                  size_type max_msg_size,
94                  const permissions &perm = permissions());
95
96    //!Opens a previously created process shared message queue with name "name".
97    //!If the queue was not previously created or there are no free resources,
98    //!throws an error.
99    message_queue_t(open_only_t open_only,
100                  const char *name);
101
102    //!Destroys *this and indicates that the calling process is finished using
103    //!the resource. All opened message queues are still
104    //!valid after destruction. The destructor function will deallocate
105    //!any system resources allocated by the system for use by this process for
106    //!this resource. The resource can still be opened again calling
107    //!the open constructor overload. To erase the message queue from the system
108    //!use remove().
109    ~message_queue_t();
110
111    //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
112    //!message queue with priority "priority". If the message queue is full
113    //!the sender is blocked. Throws interprocess_error on error.
114    void send (const void *buffer,     size_type buffer_size,
115               unsigned int priority);
116
117    //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
118    //!message queue with priority "priority". If the message queue is full
119    //!the sender is not blocked and returns false, otherwise returns true.
120    //!Throws interprocess_error on error.
121    bool try_send    (const void *buffer,     size_type buffer_size,
122                          unsigned int priority);
123
124    //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
125    //!message queue with priority "priority". If the message queue is full
126    //!the sender retries until time "abs_time" is reached. Returns true if
127    //!the message has been successfully sent. Returns false if timeout is reached.
128    //!Throws interprocess_error on error.
129    bool timed_send    (const void *buffer,     size_type buffer_size,
130                            unsigned int priority,  const boost::posix_time::ptime& abs_time);
131
132    //!Receives a message from the message queue. The message is stored in buffer
133    //!"buffer", which has size "buffer_size". The received message has size
134    //!"recvd_size" and priority "priority". If the message queue is empty
135    //!the receiver is blocked. Throws interprocess_error on error.
136    void receive (void *buffer,           size_type buffer_size,
137                  size_type &recvd_size,unsigned int &priority);
138
139    //!Receives a message from the message queue. The message is stored in buffer
140    //!"buffer", which has size "buffer_size". The received message has size
141    //!"recvd_size" and priority "priority". If the message queue is empty
142    //!the receiver is not blocked and returns false, otherwise returns true.
143    //!Throws interprocess_error on error.
144    bool try_receive (void *buffer,           size_type buffer_size,
145                      size_type &recvd_size,unsigned int &priority);
146
147    //!Receives a message from the message queue. The message is stored in buffer
148    //!"buffer", which has size "buffer_size". The received message has size
149    //!"recvd_size" and priority "priority". If the message queue is empty
150    //!the receiver retries until time "abs_time" is reached. Returns true if
151    //!the message has been successfully sent. Returns false if timeout is reached.
152    //!Throws interprocess_error on error.
153    bool timed_receive (void *buffer,           size_type buffer_size,
154                        size_type &recvd_size,unsigned int &priority,
155                        const boost::posix_time::ptime &abs_time);
156
157    //!Returns the maximum number of messages allowed by the queue. The message
158    //!queue must be opened or created previously. Otherwise, returns 0.
159    //!Never throws
160    size_type get_max_msg() const;
161
162    //!Returns the maximum size of message allowed by the queue. The message
163    //!queue must be opened or created previously. Otherwise, returns 0.
164    //!Never throws
165    size_type get_max_msg_size() const;
166
167    //!Returns the number of messages currently stored.
168    //!Never throws
169    size_type get_num_msg() const;
170
171    //!Removes the message queue from the system.
172    //!Returns false on error. Never throws
173    static bool remove(const char *name);
174
175    #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
176    private:
177    typedef boost::posix_time::ptime ptime;
178
179    friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
180
181    bool do_receive(block_t block,
182                    void *buffer,         size_type buffer_size,
183                    size_type &recvd_size, unsigned int &priority,
184                    const ptime &abs_time);
185
186    bool do_send(block_t block,
187                 const void *buffer,      size_type buffer_size,
188                 unsigned int priority,   const ptime &abs_time);
189
190    //!Returns the needed memory size for the shared message queue.
191    //!Never throws
192    static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
193    typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
194    open_create_impl_t m_shmem;
195    #endif   //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
196 };
197
198 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
199
200 namespace ipcdetail {
201
202 //!This header is the prefix of each message in the queue
203 template<class VoidPointer>
204 class msg_hdr_t
205 {
206    typedef VoidPointer                                                           void_pointer;
207    typedef typename boost::intrusive::
208       pointer_traits<void_pointer>::template
209          rebind_pointer<char>::type                                              char_ptr;
210    typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type  difference_type;
211    typedef typename boost::make_unsigned<difference_type>::type                  size_type;
212
213    public:
214    size_type               len;     // Message length
215    unsigned int            priority;// Message priority
216    //!Returns the data buffer associated with this this message
217    void * data(){ return this+1; }  //
218 };
219
220 //!This functor is the predicate to order stored messages by priority
221 template<class VoidPointer>
222 class priority_functor
223 {
224    typedef typename boost::intrusive::
225       pointer_traits<VoidPointer>::template
226          rebind_pointer<msg_hdr_t<VoidPointer> >::type                  msg_hdr_ptr_t;
227
228    public:
229    bool operator()(const msg_hdr_ptr_t &msg1,
230                    const msg_hdr_ptr_t &msg2) const
231       {  return msg1->priority < msg2->priority;  }
232 };
233
234 //!This header is placed in the beginning of the shared memory and contains
235 //!the data to control the queue. This class initializes the shared memory
236 //!in the following way: in ascending memory address with proper alignment
237 //!fillings:
238 //!
239 //!-> mq_hdr_t:
240 //!   Main control block that controls the rest of the elements
241 //!
242 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
243 //!   An array of pointers with size "max_num_msg" called index. Each pointer
244 //!   points to a preallocated message. Elements of this array are
245 //!   reordered in runtime in the following way:
246 //!
247 //!   IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
248 //!
249 //!   When the current number of messages is "cur_num_msg", the array
250 //!   is treated like a circular buffer. Starting from position "cur_first_msg"
251 //!   "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
252 //!   point to free messages. Those "cur_num_msg" pointers are
253 //!   ordered by the priority of the pointed message and by insertion order
254 //!   if two messages have the same priority. So the next message to be
255 //!   used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
256 //!   and the first free message ready to be used in a "send" operation is
257 //!   [cur_first_msg] if circular buffer is extended from front,
258 //!   [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
259 //!
260 //!   This transforms the index in a circular buffer with an embedded free
261 //!   message queue.
262 //!
263 //!   ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
264 //!
265 //!   When the current number of messages is "cur_num_msg", the first
266 //!   "cur_num_msg" pointers point to inserted messages and the rest
267 //!   point to free messages. The first "cur_num_msg" pointers are
268 //!   ordered by the priority of the pointed message and by insertion order
269 //!   if two messages have the same priority. So the next message to be
270 //!   used in a "receive" is pointed by index [cur_num_msg-1] and the first free
271 //!   message ready to be used in a "send" operation is index [cur_num_msg].
272 //!
273 //!   This transforms the index in a fixed size priority queue with an embedded free
274 //!   message queue.
275 //!
276 //!-> struct message_t
277 //!   {
278 //!      msg_hdr_t            header;
279 //!      char[max_msg_size]   data;
280 //!   } messages [max_num_msg];
281 //!
282 //!   An array of buffers of preallocated messages, each one prefixed with the
283 //!   msg_hdr_t structure. Each of this message is pointed by one pointer of
284 //!   the index structure.
285 template<class VoidPointer>
286 class mq_hdr_t
287    : public ipcdetail::priority_functor<VoidPointer>
288 {
289    typedef VoidPointer                                                     void_pointer;
290    typedef msg_hdr_t<void_pointer>                                         msg_header;
291    typedef typename boost::intrusive::
292       pointer_traits<void_pointer>::template
293          rebind_pointer<msg_header>::type                                  msg_hdr_ptr_t;
294    typedef typename boost::intrusive::pointer_traits
295       <msg_hdr_ptr_t>::difference_type                                     difference_type;
296    typedef typename boost::make_unsigned<difference_type>::type            size_type;
297    typedef typename boost::intrusive::
298       pointer_traits<void_pointer>::template
299          rebind_pointer<msg_hdr_ptr_t>::type                              msg_hdr_ptr_ptr_t;
300    typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
301
302    public:
303    //!Constructor. This object must be constructed in the beginning of the
304    //!shared memory of the size returned by the function "get_mem_size".
305    //!This constructor initializes the needed resources and creates
306    //!the internal structures like the priority index. This can throw.
307    mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
308       : m_max_num_msg(max_num_msg),
309          m_max_msg_size(max_msg_size),
310          m_cur_num_msg(0)
311          #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
312          ,m_cur_first_msg(0u)
313          ,m_blocked_senders(0u)
314          ,m_blocked_receivers(0u)
315          #endif
316       {  this->initialize_memory();  }
317
318    //!Returns true if the message queue is full
319    bool is_full() const
320       {  return m_cur_num_msg == m_max_num_msg;  }
321
322    //!Returns true if the message queue is empty
323    bool is_empty() const
324       {  return !m_cur_num_msg;  }
325
326    //!Frees the top priority message and saves it in the free message list
327    void free_top_msg()
328       {  --m_cur_num_msg;  }
329
330    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
331
332    typedef msg_hdr_ptr_t *iterator;
333
334    size_type end_pos() const
335    {
336       const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
337       return space_until_bufend > m_cur_num_msg
338          ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
339    }
340
341    //!Returns the inserted message with top priority
342    msg_header &top_msg()
343    {
344       size_type pos = this->end_pos();
345       return *mp_index[pos ? --pos : m_max_num_msg - 1];
346    }
347
348    //!Returns the inserted message with bottom priority
349    msg_header &bottom_msg()
350       {  return *mp_index[m_cur_first_msg];   }
351
352    iterator inserted_ptr_begin() const
353    {  return &mp_index[m_cur_first_msg]; }
354
355    iterator inserted_ptr_end() const
356       {  return &mp_index[this->end_pos()];  }
357
358    iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
359    {
360       iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
361       if(end < begin){
362          iterator idx_end = &mp_index[m_max_num_msg];
363          iterator ret = std::lower_bound(begin, idx_end, value, func);
364          if(idx_end == ret){
365             iterator idx_beg = &mp_index[0];
366             ret = std::lower_bound(idx_beg, end, value, func);
367             //sanity check, these cases should not call lower_bound (optimized out)
368             BOOST_ASSERT(ret != end);
369             BOOST_ASSERT(ret != begin);
370             return ret;
371          }
372          else{
373             return ret;
374          }
375       }
376       else{
377          return std::lower_bound(begin, end, value, func);
378       }
379    }
380
381    msg_header & insert_at(iterator where)
382    {
383       iterator it_inserted_ptr_end = this->inserted_ptr_end();
384       iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
385       if(where == it_inserted_ptr_beg){
386          //unsigned integer guarantees underflow
387          m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
388          --m_cur_first_msg;
389          ++m_cur_num_msg;
390          return *mp_index[m_cur_first_msg];
391       }
392       else if(where == it_inserted_ptr_end){
393          ++m_cur_num_msg;
394          return **it_inserted_ptr_end;
395       }
396       else{
397          size_type pos  = where - &mp_index[0];
398          size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
399          //Check if it's more efficient to move back or move front
400          if(circ_pos < m_cur_num_msg/2){
401             //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
402             //indicates two step insertion
403             if(!pos){
404                pos   = m_max_num_msg;
405                where = &mp_index[m_max_num_msg-1];
406             }
407             else{
408                --where;
409             }
410             const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
411             const size_type first_segment_beg  = unique_segment ? m_cur_first_msg : 1u;
412             const size_type first_segment_end  = pos;
413             const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
414             const size_type second_segment_end = m_max_num_msg;
415             const msg_hdr_ptr_t backup   = *(&mp_index[0] + (unique_segment ?  first_segment_beg : second_segment_beg) - 1);
416
417             //First segment
418             if(!unique_segment){
419                std::copy( &mp_index[0] + second_segment_beg
420                         , &mp_index[0] + second_segment_end
421                         , &mp_index[0] + second_segment_beg - 1);
422                mp_index[m_max_num_msg-1] = mp_index[0];
423             }
424             std::copy( &mp_index[0] + first_segment_beg
425                      , &mp_index[0] + first_segment_end
426                      , &mp_index[0] + first_segment_beg - 1);
427             *where = backup;
428             m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
429             --m_cur_first_msg;
430             ++m_cur_num_msg;
431             return **where;
432          }
433          else{
434             //The queue can't be full so end_pos < m_cur_first_msg
435             //indicates two step insertion
436             const size_type pos_end = this->end_pos();
437             const bool unique_segment = pos < pos_end;
438             const size_type first_segment_beg  = pos;
439             const size_type first_segment_end  = unique_segment  ? pos_end : m_max_num_msg-1;
440             const size_type second_segment_beg = 0u;
441             const size_type second_segment_end = unique_segment ? 0u : pos_end;
442             const msg_hdr_ptr_t backup   = *it_inserted_ptr_end;
443
444             //First segment
445             if(!unique_segment){
446                std::copy_backward( &mp_index[0] + second_segment_beg
447                                  , &mp_index[0] + second_segment_end
448                                  , &mp_index[0] + second_segment_end + 1);
449                mp_index[0] = mp_index[m_max_num_msg-1];
450             }
451             std::copy_backward( &mp_index[0] + first_segment_beg
452                               , &mp_index[0] + first_segment_end
453                               , &mp_index[0] + first_segment_end + 1);
454             *where = backup;
455             ++m_cur_num_msg;
456             return **where;
457          }
458       }
459    }
460
461    #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
462
463    typedef msg_hdr_ptr_t *iterator;
464
465    //!Returns the inserted message with top priority
466    msg_header &top_msg()
467       {  return *mp_index[m_cur_num_msg-1];   }
468
469    //!Returns the inserted message with bottom priority
470    msg_header &bottom_msg()
471       {  return *mp_index[0];   }
472
473    iterator inserted_ptr_begin() const
474    {  return &mp_index[0]; }
475
476    iterator inserted_ptr_end() const
477    {  return &mp_index[m_cur_num_msg]; }
478
479    iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
480    {  return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func);  }
481
482    msg_header & insert_at(iterator pos)
483    {
484       const msg_hdr_ptr_t backup = *inserted_ptr_end();
485       std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
486       *pos = backup;
487       ++m_cur_num_msg;
488       return **pos;
489    }
490
491    #endif   //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
492
493    //!Inserts the first free message in the priority queue
494    msg_header & queue_free_msg(unsigned int priority)
495    {
496       //Get priority queue's range
497       iterator it  (inserted_ptr_begin()), it_end(inserted_ptr_end());
498       //Optimize for non-priority usage
499       if(m_cur_num_msg && priority > this->bottom_msg().priority){
500          //Check for higher priority than all stored messages
501          if(priority > this->top_msg().priority){
502             it = it_end;
503          }
504          else{
505             //Since we don't now which free message we will pick
506             //build a dummy header for searches
507             msg_header dummy_hdr;
508             dummy_hdr.priority = priority;
509
510             //Get free msg
511             msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
512
513             //Check where the free message should be placed
514             it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
515          }
516       }
517       //Insert the free message in the correct position
518       return this->insert_at(it);
519    }
520
521    //!Returns the number of bytes needed to construct a message queue with
522    //!"max_num_size" maximum number of messages and "max_msg_size" maximum
523    //!message size. Never throws.
524    static size_type get_mem_size
525       (size_type max_msg_size, size_type max_num_msg)
526    {
527       const size_type
528        msg_hdr_align  = ::boost::alignment_of<msg_header>::value,
529        index_align    = ::boost::alignment_of<msg_hdr_ptr_t>::value,
530          r_hdr_size     = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
531          r_index_size   = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
532          r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
533       return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
534          open_create_impl_t::ManagedOpenOrCreateUserOffset;
535    }
536
537    //!Initializes the memory structures to preallocate messages and constructs the
538    //!message index. Never throws.
539    void initialize_memory()
540    {
541       const size_type
542         msg_hdr_align  = ::boost::alignment_of<msg_header>::value,
543         index_align    = ::boost::alignment_of<msg_hdr_ptr_t>::value,
544          r_hdr_size     = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
545          r_index_size   = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
546          r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
547
548       //Pointer to the index
549       msg_hdr_ptr_t *index =  reinterpret_cast<msg_hdr_ptr_t*>
550                                  (reinterpret_cast<char*>(this)+r_hdr_size);
551
552       //Pointer to the first message header
553       msg_header *msg_hdr   =  reinterpret_cast<msg_header*>
554                                  (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
555
556       //Initialize the pointer to the index
557       mp_index             = index;
558
559       //Initialize the index so each slot points to a preallocated message
560       for(size_type i = 0; i < m_max_num_msg; ++i){
561          index[i] = msg_hdr;
562          msg_hdr  = reinterpret_cast<msg_header*>
563                         (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
564       }
565    }
566
567    public:
568    //Pointer to the index
569    msg_hdr_ptr_ptr_t          mp_index;
570    //Maximum number of messages of the queue
571    const size_type            m_max_num_msg;
572    //Maximum size of messages of the queue
573    const size_type            m_max_msg_size;
574    //Current number of messages
575    size_type                  m_cur_num_msg;
576    //Mutex to protect data structures
577    interprocess_mutex         m_mutex;
578    //Condition block receivers when there are no messages
579    interprocess_condition     m_cond_recv;
580    //Condition block senders when the queue is full
581    interprocess_condition     m_cond_send;
582    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
583    //Current start offset in the circular index
584    size_type                  m_cur_first_msg;
585    size_type                  m_blocked_senders;
586    size_type                  m_blocked_receivers;
587    #endif
588 };
589
590
591 //!This is the atomic functor to be executed when creating or opening
592 //!shared memory. Never throws
593 template<class VoidPointer>
594 class msg_queue_initialization_func_t
595 {
596    public:
597    typedef typename boost::intrusive::
598       pointer_traits<VoidPointer>::template
599          rebind_pointer<char>::type                                    char_ptr;
600    typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
601    typedef typename boost::make_unsigned<difference_type>::type        size_type;
602
603    msg_queue_initialization_func_t(size_type maxmsg = 0,
604                          size_type maxmsgsize = 0)
605       : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
606
607    bool operator()(void *address, size_type, bool created)
608    {
609       char      *mptr;
610
611       if(created){
612          mptr     = reinterpret_cast<char*>(address);
613          //Construct the message queue header at the beginning
614          BOOST_TRY{
615             new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
616          }
617          BOOST_CATCH(...){
618             return false;
619          }
620          BOOST_CATCH_END
621       }
622       return true;
623    }
624
625    std::size_t get_min_size() const
626    {
627       return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
628       - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
629    }
630
631    const size_type m_maxmsg;
632    const size_type m_maxmsgsize;
633 };
634
635 }  //namespace ipcdetail {
636
637 template<class VoidPointer>
638 inline message_queue_t<VoidPointer>::~message_queue_t()
639 {}
640
641 template<class VoidPointer>
642 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
643    (size_type max_msg_size, size_type max_num_msg)
644 {  return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg);   }
645
646 template<class VoidPointer>
647 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
648                                     const char *name,
649                                     size_type max_num_msg,
650                                     size_type max_msg_size,
651                                     const permissions &perm)
652       //Create shared memory and execute functor atomically
653    :  m_shmem(create_only,
654               name,
655               get_mem_size(max_msg_size, max_num_msg),
656               read_write,
657               static_cast<void*>(0),
658               //Prepare initialization functor
659               ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
660               perm)
661 {}
662
663 template<class VoidPointer>
664 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
665                                     const char *name,
666                                     size_type max_num_msg,
667                                     size_type max_msg_size,
668                                     const permissions &perm)
669       //Create shared memory and execute functor atomically
670    :  m_shmem(open_or_create,
671               name,
672               get_mem_size(max_msg_size, max_num_msg),
673               read_write,
674               static_cast<void*>(0),
675               //Prepare initialization functor
676               ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
677               perm)
678 {}
679
680 template<class VoidPointer>
681 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
682    //Create shared memory and execute functor atomically
683    :  m_shmem(open_only,
684               name,
685               read_write,
686               static_cast<void*>(0),
687               //Prepare initialization functor
688               ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
689 {}
690
691 template<class VoidPointer>
692 inline void message_queue_t<VoidPointer>::send
693    (const void *buffer, size_type buffer_size, unsigned int priority)
694 {  this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
695
696 template<class VoidPointer>
697 inline bool message_queue_t<VoidPointer>::try_send
698    (const void *buffer, size_type buffer_size, unsigned int priority)
699 {  return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
700
701 template<class VoidPointer>
702 inline bool message_queue_t<VoidPointer>::timed_send
703    (const void *buffer, size_type buffer_size
704    ,unsigned int priority, const boost::posix_time::ptime &abs_time)
705 {
706    if(abs_time == boost::posix_time::pos_infin){
707       this->send(buffer, buffer_size, priority);
708       return true;
709    }
710    return this->do_send(timed, buffer, buffer_size, priority, abs_time);
711 }
712
713 template<class VoidPointer>
714 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
715                                 const void *buffer,      size_type buffer_size,
716                                 unsigned int priority,   const boost::posix_time::ptime &abs_time)
717 {
718    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
719    //Check if buffer is smaller than maximum allowed
720    if (buffer_size > p_hdr->m_max_msg_size) {
721       throw interprocess_exception(size_error);
722    }
723
724    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
725    bool notify_blocked_receivers = false;
726    #endif
727    //---------------------------------------------
728    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
729    //---------------------------------------------
730    {
731       //If the queue is full execute blocking logic
732       if (p_hdr->is_full()) {
733          BOOST_TRY{
734             #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
735             ++p_hdr->m_blocked_senders;
736             #endif
737             switch(block){
738                case non_blocking :
739                   #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
740                   --p_hdr->m_blocked_senders;
741                   #endif
742                   return false;
743                break;
744
745                case blocking :
746                   do{
747                      p_hdr->m_cond_send.wait(lock);
748                   }
749                   while (p_hdr->is_full());
750                break;
751
752                case timed :
753                   do{
754                      if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
755                         if(p_hdr->is_full()){
756                            #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
757                            --p_hdr->m_blocked_senders;
758                            #endif
759                            return false;
760                         }
761                         break;
762                      }
763                   }
764                   while (p_hdr->is_full());
765                break;
766                default:
767                break;
768             }
769             #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
770             --p_hdr->m_blocked_senders;
771             #endif
772          }
773          BOOST_CATCH(...){
774             #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
775             --p_hdr->m_blocked_senders;
776             #endif
777             BOOST_RETHROW;
778          }
779          BOOST_CATCH_END
780       }
781
782       #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
783       notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
784       #endif
785       //Insert the first free message in the priority queue
786       ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
787
788       //Sanity check, free msgs are always cleaned when received
789       BOOST_ASSERT(free_msg_hdr.priority == 0);
790       BOOST_ASSERT(free_msg_hdr.len == 0);
791
792       //Copy control data to the free message
793       free_msg_hdr.priority = priority;
794       free_msg_hdr.len      = buffer_size;
795
796       //Copy user buffer to the message
797       std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
798    }  // Lock end
799
800    //Notify outside lock to avoid contention. This might produce some
801    //spurious wakeups, but it's usually far better than notifying inside.
802    //If this message changes the queue empty state, notify it to receivers
803    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
804    if (notify_blocked_receivers){
805       p_hdr->m_cond_recv.notify_one();
806    }
807    #else
808    p_hdr->m_cond_recv.notify_one();
809    #endif
810
811    return true;
812 }
813
814 template<class VoidPointer>
815 inline void message_queue_t<VoidPointer>::receive(void *buffer,        size_type buffer_size,
816                         size_type &recvd_size,   unsigned int &priority)
817 {  this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
818
819 template<class VoidPointer>
820 inline bool
821    message_queue_t<VoidPointer>::try_receive(void *buffer,              size_type buffer_size,
822                               size_type &recvd_size,   unsigned int &priority)
823 {  return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
824
825 template<class VoidPointer>
826 inline bool
827    message_queue_t<VoidPointer>::timed_receive(void *buffer,            size_type buffer_size,
828                                 size_type &recvd_size,   unsigned int &priority,
829                                 const boost::posix_time::ptime &abs_time)
830 {
831    if(abs_time == boost::posix_time::pos_infin){
832       this->receive(buffer, buffer_size, recvd_size, priority);
833       return true;
834    }
835    return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
836 }
837
838 template<class VoidPointer>
839 inline bool
840    message_queue_t<VoidPointer>::do_receive(block_t block,
841                           void *buffer,            size_type buffer_size,
842                           size_type &recvd_size,   unsigned int &priority,
843                           const boost::posix_time::ptime &abs_time)
844 {
845    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
846    //Check if buffer is big enough for any message
847    if (buffer_size < p_hdr->m_max_msg_size) {
848       throw interprocess_exception(size_error);
849    }
850
851    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
852    bool notify_blocked_senders = false;
853    #endif
854    //---------------------------------------------
855    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
856    //---------------------------------------------
857    {
858       //If there are no messages execute blocking logic
859       if (p_hdr->is_empty()) {
860          BOOST_TRY{
861             #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
862             ++p_hdr->m_blocked_receivers;
863             #endif
864             switch(block){
865                case non_blocking :
866                   #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
867                   --p_hdr->m_blocked_receivers;
868                   #endif
869                   return false;
870                break;
871
872                case blocking :
873                   do{
874                      p_hdr->m_cond_recv.wait(lock);
875                   }
876                   while (p_hdr->is_empty());
877                break;
878
879                case timed :
880                   do{
881                      if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
882                         if(p_hdr->is_empty()){
883                            #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
884                            --p_hdr->m_blocked_receivers;
885                            #endif
886                            return false;
887                         }
888                         break;
889                      }
890                   }
891                   while (p_hdr->is_empty());
892                break;
893
894                //Paranoia check
895                default:
896                break;
897             }
898             #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
899             --p_hdr->m_blocked_receivers;
900             #endif
901          }
902          BOOST_CATCH(...){
903             #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
904             --p_hdr->m_blocked_receivers;
905             #endif
906             BOOST_RETHROW;
907          }
908          BOOST_CATCH_END
909       }
910
911       #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
912       notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
913       #endif
914
915       //There is at least one message ready to pick, get the top one
916       ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
917
918       //Get data from the message
919       recvd_size     = top_msg.len;
920       priority       = top_msg.priority;
921
922       //Some cleanup to ease debugging
923       top_msg.len       = 0;
924       top_msg.priority  = 0;
925
926       //Copy data to receiver's bufers
927       std::memcpy(buffer, top_msg.data(), recvd_size);
928
929       //Free top message and put it in the free message list
930       p_hdr->free_top_msg();
931    }  //Lock end
932
933    //Notify outside lock to avoid contention. This might produce some
934    //spurious wakeups, but it's usually far better than notifying inside.
935    //If this reception changes the queue full state, notify senders
936    #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
937    if (notify_blocked_senders){
938       p_hdr->m_cond_send.notify_one();
939    }
940    #else
941    p_hdr->m_cond_send.notify_one();
942    #endif
943
944    return true;
945 }
946
947 template<class VoidPointer>
948 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
949 {
950    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
951    return p_hdr ? p_hdr->m_max_num_msg : 0;  }
952
953 template<class VoidPointer>
954 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
955 {
956    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
957    return p_hdr ? p_hdr->m_max_msg_size : 0;
958 }
959
960 template<class VoidPointer>
961 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
962 {
963    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
964    if(p_hdr){
965       //---------------------------------------------
966       scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
967       //---------------------------------------------
968       return p_hdr->m_cur_num_msg;
969    }
970
971    return 0;
972 }
973
974 template<class VoidPointer>
975 inline bool message_queue_t<VoidPointer>::remove(const char *name)
976 {  return shared_memory_object::remove(name);  }
977
978 #else
979
980 //!Typedef for a default message queue
981 //!to be used between processes
982 typedef message_queue_t<offset_ptr<void> > message_queue;
983
984 #endif   //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
985
986 }} //namespace boost{  namespace interprocess{
987
988 #include <boost/interprocess/detail/config_end.hpp>
989
990 #endif   //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP