change support python version
[platform/upstream/boost.git] / boost / lockfree / spsc_queue.hpp
1 //  lock-free single-producer/single-consumer ringbuffer
2 //  this algorithm is implemented in various projects (linux kernel)
3 //
4 //  Copyright (C) 2009-2013 Tim Blechmann
5 //
6 //  Distributed under the Boost Software License, Version 1.0. (See
7 //  accompanying file LICENSE_1_0.txt or copy at
8 //  http://www.boost.org/LICENSE_1_0.txt)
9
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12
13 #include <algorithm>
14 #include <memory>
15
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/utility.hpp>
20 #include <boost/next_prior.hpp>
21 #include <boost/utility/enable_if.hpp>
22 #include <boost/config.hpp> // for BOOST_LIKELY
23
24 #include <boost/type_traits/has_trivial_destructor.hpp>
25 #include <boost/type_traits/is_convertible.hpp>
26
27 #include <boost/lockfree/detail/allocator_rebind_helper.hpp>
28 #include <boost/lockfree/detail/atomic.hpp>
29 #include <boost/lockfree/detail/copy_payload.hpp>
30 #include <boost/lockfree/detail/parameter.hpp>
31 #include <boost/lockfree/detail/prefix.hpp>
32
33 #include <boost/lockfree/lockfree_forward.hpp>
34
35 #ifdef BOOST_HAS_PRAGMA_ONCE
36 #pragma once
37 #endif
38
39 namespace boost    {
40 namespace lockfree {
41 namespace detail   {
42
43 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
44                               boost::parameter::optional<tag::allocator>
45                              > ringbuffer_signature;
46
47 template <typename T>
48 class ringbuffer_base
49 {
50 #ifndef BOOST_DOXYGEN_INVOKED
51 protected:
52     typedef std::size_t size_t;
53     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
54     atomic<size_t> write_index_;
55     char padding1[padding_size]; /* force read_index and write_index to different cache lines */
56     atomic<size_t> read_index_;
57
58     BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
59     BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
60
61 protected:
62     ringbuffer_base(void):
63         write_index_(0), read_index_(0)
64     {}
65
66     static size_t next_index(size_t arg, size_t max_size)
67     {
68         size_t ret = arg + 1;
69         while (BOOST_UNLIKELY(ret >= max_size))
70             ret -= max_size;
71         return ret;
72     }
73
74     static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
75     {
76         if (write_index >= read_index)
77             return write_index - read_index;
78
79         const size_t ret = write_index + max_size - read_index;
80         return ret;
81     }
82
83     static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
84     {
85         size_t ret = read_index - write_index - 1;
86         if (write_index >= read_index)
87             ret += max_size;
88         return ret;
89     }
90
91     size_t read_available(size_t max_size) const
92     {
93         size_t write_index = write_index_.load(memory_order_acquire);
94         const size_t read_index  = read_index_.load(memory_order_relaxed);
95         return read_available(write_index, read_index, max_size);
96     }
97
98     size_t write_available(size_t max_size) const
99     {
100         size_t write_index = write_index_.load(memory_order_relaxed);
101         const size_t read_index  = read_index_.load(memory_order_acquire);
102         return write_available(write_index, read_index, max_size);
103     }
104
105     bool push(T const & t, T * buffer, size_t max_size)
106     {
107         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
108         const size_t next = next_index(write_index, max_size);
109
110         if (next == read_index_.load(memory_order_acquire))
111             return false; /* ringbuffer is full */
112
113         new (buffer + write_index) T(t); // copy-construct
114
115         write_index_.store(next, memory_order_release);
116
117         return true;
118     }
119
120     size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
121     {
122         return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
123     }
124
125     template <typename ConstIterator>
126     ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
127     {
128         // FIXME: avoid std::distance
129
130         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
131         const size_t read_index  = read_index_.load(memory_order_acquire);
132         const size_t avail = write_available(write_index, read_index, max_size);
133
134         if (avail == 0)
135             return begin;
136
137         size_t input_count = std::distance(begin, end);
138         input_count = (std::min)(input_count, avail);
139
140         size_t new_write_index = write_index + input_count;
141
142         const ConstIterator last = boost::next(begin, input_count);
143
144         if (write_index + input_count > max_size) {
145             /* copy data in two sections */
146             const size_t count0 = max_size - write_index;
147             const ConstIterator midpoint = boost::next(begin, count0);
148
149             std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
150             std::uninitialized_copy(midpoint, last, internal_buffer);
151             new_write_index -= max_size;
152         } else {
153             std::uninitialized_copy(begin, last, internal_buffer + write_index);
154
155             if (new_write_index == max_size)
156                 new_write_index = 0;
157         }
158
159         write_index_.store(new_write_index, memory_order_release);
160         return last;
161     }
162
163     template <typename Functor>
164     bool consume_one(Functor & functor, T * buffer, size_t max_size)
165     {
166         const size_t write_index = write_index_.load(memory_order_acquire);
167         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
168         if ( empty(write_index, read_index) )
169             return false;
170
171         T & object_to_consume = buffer[read_index];
172         functor( object_to_consume );
173         object_to_consume.~T();
174
175         size_t next = next_index(read_index, max_size);
176         read_index_.store(next, memory_order_release);
177         return true;
178     }
179
180     template <typename Functor>
181     bool consume_one(Functor const & functor, T * buffer, size_t max_size)
182     {
183         const size_t write_index = write_index_.load(memory_order_acquire);
184         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
185         if ( empty(write_index, read_index) )
186             return false;
187
188         T & object_to_consume = buffer[read_index];
189         functor( object_to_consume );
190         object_to_consume.~T();
191
192         size_t next = next_index(read_index, max_size);
193         read_index_.store(next, memory_order_release);
194         return true;
195     }
196
197     template <typename Functor>
198     size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
199     {
200         const size_t write_index = write_index_.load(memory_order_acquire);
201         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
202
203         const size_t avail = read_available(write_index, read_index, max_size);
204
205         if (avail == 0)
206             return 0;
207
208         const size_t output_count = avail;
209
210         size_t new_read_index = read_index + output_count;
211
212         if (read_index + output_count > max_size) {
213             /* copy data in two sections */
214             const size_t count0 = max_size - read_index;
215             const size_t count1 = output_count - count0;
216
217             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
218             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
219
220             new_read_index -= max_size;
221         } else {
222             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
223
224             if (new_read_index == max_size)
225                 new_read_index = 0;
226         }
227
228         read_index_.store(new_read_index, memory_order_release);
229         return output_count;
230     }
231
232     template <typename Functor>
233     size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
234     {
235         const size_t write_index = write_index_.load(memory_order_acquire);
236         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
237
238         const size_t avail = read_available(write_index, read_index, max_size);
239
240         if (avail == 0)
241             return 0;
242
243         const size_t output_count = avail;
244
245         size_t new_read_index = read_index + output_count;
246
247         if (read_index + output_count > max_size) {
248             /* copy data in two sections */
249             const size_t count0 = max_size - read_index;
250             const size_t count1 = output_count - count0;
251
252             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
253             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
254
255             new_read_index -= max_size;
256         } else {
257             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
258
259             if (new_read_index == max_size)
260                 new_read_index = 0;
261         }
262
263         read_index_.store(new_read_index, memory_order_release);
264         return output_count;
265     }
266
267     size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
268     {
269         const size_t write_index = write_index_.load(memory_order_acquire);
270         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
271
272         const size_t avail = read_available(write_index, read_index, max_size);
273
274         if (avail == 0)
275             return 0;
276
277         output_count = (std::min)(output_count, avail);
278
279         size_t new_read_index = read_index + output_count;
280
281         if (read_index + output_count > max_size) {
282             /* copy data in two sections */
283             const size_t count0 = max_size - read_index;
284             const size_t count1 = output_count - count0;
285
286             copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
287             copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
288
289             new_read_index -= max_size;
290         } else {
291             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
292             if (new_read_index == max_size)
293                 new_read_index = 0;
294         }
295
296         read_index_.store(new_read_index, memory_order_release);
297         return output_count;
298     }
299
300     template <typename OutputIterator>
301     size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
302     {
303         const size_t write_index = write_index_.load(memory_order_acquire);
304         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
305
306         const size_t avail = read_available(write_index, read_index, max_size);
307         if (avail == 0)
308             return 0;
309
310         size_t new_read_index = read_index + avail;
311
312         if (read_index + avail > max_size) {
313             /* copy data in two sections */
314             const size_t count0 = max_size - read_index;
315             const size_t count1 = avail - count0;
316
317             it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
318             copy_and_delete(internal_buffer, internal_buffer + count1, it);
319
320             new_read_index -= max_size;
321         } else {
322             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
323             if (new_read_index == max_size)
324                 new_read_index = 0;
325         }
326
327         read_index_.store(new_read_index, memory_order_release);
328         return avail;
329     }
330
331     const T& front(const T * internal_buffer) const
332     {
333         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
334         return *(internal_buffer + read_index);
335     }
336
337     T& front(T * internal_buffer)
338     {
339         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
340         return *(internal_buffer + read_index);
341     }
342 #endif
343
344
345 public:
346     /** reset the ringbuffer
347      *
348      * \note Not thread-safe
349      * */
350     void reset(void)
351     {
352         if ( !boost::has_trivial_destructor<T>::value ) {
353             // make sure to call all destructors!
354
355             T dummy_element;
356             while (pop(dummy_element))
357             {}
358         } else {
359             write_index_.store(0, memory_order_relaxed);
360             read_index_.store(0, memory_order_release);
361         }
362     }
363
364     /** Check if the ringbuffer is empty
365      *
366      * \return true, if the ringbuffer is empty, false otherwise
367      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
368      * */
369     bool empty(void)
370     {
371         return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
372     }
373
374     /**
375      * \return true, if implementation is lock-free.
376      *
377      * */
378     bool is_lock_free(void) const
379     {
380         return write_index_.is_lock_free() && read_index_.is_lock_free();
381     }
382
383 private:
384     bool empty(size_t write_index, size_t read_index)
385     {
386         return write_index == read_index;
387     }
388
389     template< class OutputIterator >
390     OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
391     {
392         if (boost::has_trivial_destructor<T>::value) {
393             return std::copy(first, last, out); // will use memcpy if possible
394         } else {
395             for (; first != last; ++first, ++out) {
396                 *out = *first;
397                 first->~T();
398             }
399             return out;
400         }
401     }
402
403     template< class Functor >
404     void run_functor_and_delete( T * first, T * last, Functor & functor )
405     {
406         for (; first != last; ++first) {
407             functor(*first);
408             first->~T();
409         }
410     }
411
412     template< class Functor >
413     void run_functor_and_delete( T * first, T * last, Functor const & functor )
414     {
415         for (; first != last; ++first) {
416             functor(*first);
417             first->~T();
418         }
419     }
420 };
421
422 template <typename T, std::size_t MaxSize>
423 class compile_time_sized_ringbuffer:
424     public ringbuffer_base<T>
425 {
426     typedef std::size_t size_type;
427     static const std::size_t max_size = MaxSize + 1;
428
429     typedef typename boost::aligned_storage<max_size * sizeof(T),
430                                             boost::alignment_of<T>::value
431                                            >::type storage_type;
432
433     storage_type storage_;
434
435     T * data()
436     {
437         return static_cast<T*>(storage_.address());
438     }
439
440     const T * data() const
441     {
442         return static_cast<const T*>(storage_.address());
443     }
444
445 protected:
446     size_type max_number_of_elements() const
447     {
448         return max_size;
449     }
450
451 public:
452     bool push(T const & t)
453     {
454         return ringbuffer_base<T>::push(t, data(), max_size);
455     }
456
457     template <typename Functor>
458     bool consume_one(Functor & f)
459     {
460         return ringbuffer_base<T>::consume_one(f, data(), max_size);
461     }
462
463     template <typename Functor>
464     bool consume_one(Functor const & f)
465     {
466         return ringbuffer_base<T>::consume_one(f, data(), max_size);
467     }
468
469     template <typename Functor>
470     size_type consume_all(Functor & f)
471     {
472         return ringbuffer_base<T>::consume_all(f, data(), max_size);
473     }
474
475     template <typename Functor>
476     size_type consume_all(Functor const & f)
477     {
478         return ringbuffer_base<T>::consume_all(f, data(), max_size);
479     }
480
481     size_type push(T const * t, size_type size)
482     {
483         return ringbuffer_base<T>::push(t, size, data(), max_size);
484     }
485
486     template <size_type size>
487     size_type push(T const (&t)[size])
488     {
489         return push(t, size);
490     }
491
492     template <typename ConstIterator>
493     ConstIterator push(ConstIterator begin, ConstIterator end)
494     {
495         return ringbuffer_base<T>::push(begin, end, data(), max_size);
496     }
497
498     size_type pop(T * ret, size_type size)
499     {
500         return ringbuffer_base<T>::pop(ret, size, data(), max_size);
501     }
502
503     template <typename OutputIterator>
504     size_type pop_to_output_iterator(OutputIterator it)
505     {
506         return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
507     }
508
509     const T& front(void) const
510     {
511         return ringbuffer_base<T>::front(data());
512     }
513
514     T& front(void)
515     {
516         return ringbuffer_base<T>::front(data());
517     }
518 };
519
520 template <typename T, typename Alloc>
521 class runtime_sized_ringbuffer:
522     public ringbuffer_base<T>,
523     private Alloc
524 {
525     typedef std::size_t size_type;
526     size_type max_elements_;
527 #ifdef BOOST_NO_CXX11_ALLOCATOR
528     typedef typename Alloc::pointer pointer;
529 #else
530     typedef std::allocator_traits<Alloc> allocator_traits;
531     typedef typename allocator_traits::pointer pointer;
532 #endif
533     pointer array_;
534
535 protected:
536     size_type max_number_of_elements() const
537     {
538         return max_elements_;
539     }
540
541 public:
542     explicit runtime_sized_ringbuffer(size_type max_elements):
543         max_elements_(max_elements + 1)
544     {
545 #ifdef BOOST_NO_CXX11_ALLOCATOR
546         array_ = Alloc::allocate(max_elements_);
547 #else
548         Alloc& alloc = *this;
549         array_ = allocator_traits::allocate(alloc, max_elements_);
550 #endif
551     }
552
553     template <typename U>
554     runtime_sized_ringbuffer(typename detail::allocator_rebind_helper<Alloc, U>::type const & alloc, size_type max_elements):
555         Alloc(alloc), max_elements_(max_elements + 1)
556     {
557 #ifdef BOOST_NO_CXX11_ALLOCATOR
558         array_ = Alloc::allocate(max_elements_);
559 #else
560         Alloc& allocator = *this;
561         array_ = allocator_traits::allocate(allocator, max_elements_);
562 #endif
563     }
564
565     runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
566         Alloc(alloc), max_elements_(max_elements + 1)
567     {
568 #ifdef BOOST_NO_CXX11_ALLOCATOR
569         array_ = Alloc::allocate(max_elements_);
570 #else
571         Alloc& allocator = *this;
572         array_ = allocator_traits::allocate(allocator, max_elements_);
573 #endif
574     }
575
576     ~runtime_sized_ringbuffer(void)
577     {
578         // destroy all remaining items
579         T out;
580         while (pop(&out, 1)) {}
581
582 #ifdef BOOST_NO_CXX11_ALLOCATOR
583         Alloc::deallocate(array_, max_elements_);
584 #else
585         Alloc& allocator = *this;
586         allocator_traits::deallocate(allocator, array_, max_elements_);
587 #endif
588     }
589
590     bool push(T const & t)
591     {
592         return ringbuffer_base<T>::push(t, &*array_, max_elements_);
593     }
594
595     template <typename Functor>
596     bool consume_one(Functor & f)
597     {
598         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
599     }
600
601     template <typename Functor>
602     bool consume_one(Functor const & f)
603     {
604         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
605     }
606
607     template <typename Functor>
608     size_type consume_all(Functor & f)
609     {
610         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
611     }
612
613     template <typename Functor>
614     size_type consume_all(Functor const & f)
615     {
616         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
617     }
618
619     size_type push(T const * t, size_type size)
620     {
621         return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
622     }
623
624     template <size_type size>
625     size_type push(T const (&t)[size])
626     {
627         return push(t, size);
628     }
629
630     template <typename ConstIterator>
631     ConstIterator push(ConstIterator begin, ConstIterator end)
632     {
633         return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_);
634     }
635
636     size_type pop(T * ret, size_type size)
637     {
638         return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_);
639     }
640
641     template <typename OutputIterator>
642     size_type pop_to_output_iterator(OutputIterator it)
643     {
644         return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_);
645     }
646
647     const T& front(void) const
648     {
649         return ringbuffer_base<T>::front(&*array_);
650     }
651
652     T& front(void)
653     {
654         return ringbuffer_base<T>::front(&*array_);
655     }
656 };
657
658 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
659 template <typename T, typename A0, typename A1>
660 #else
661 template <typename T, typename ...Options>
662 #endif
663 struct make_ringbuffer
664 {
665 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
666     typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
667 #else
668     typedef typename ringbuffer_signature::bind<Options...>::type bound_args;
669 #endif
670
671     typedef extract_capacity<bound_args> extract_capacity_t;
672
673     static const bool runtime_sized = !extract_capacity_t::has_capacity;
674     static const size_t capacity    =  extract_capacity_t::capacity;
675
676     typedef extract_allocator<bound_args, T> extract_allocator_t;
677     typedef typename extract_allocator_t::type allocator;
678
679     // allocator argument is only sane, for run-time sized ringbuffers
680     BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
681                                   mpl::bool_<!extract_allocator_t::has_allocator>,
682                                   mpl::true_
683                                  >::type::value));
684
685     typedef typename mpl::if_c<runtime_sized,
686                                runtime_sized_ringbuffer<T, allocator>,
687                                compile_time_sized_ringbuffer<T, capacity>
688                               >::type ringbuffer_type;
689 };
690
691
692 } /* namespace detail */
693
694
695 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
696  *
697  *  \b Policies:
698  *  - \c boost::lockfree::capacity<>, optional <br>
699  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
700  *
701  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
702  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
703  *    to be sized at run-time
704  *
705  *  \b Requirements:
706  *  - T must have a default constructor
707  *  - T must be copyable
708  * */
709 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
710 template <typename T, class A0, class A1>
711 #else
712 template <typename T, typename ...Options>
713 #endif
714 class spsc_queue:
715 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
716     public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
717 #else
718     public detail::make_ringbuffer<T, Options...>::ringbuffer_type
719 #endif
720 {
721 private:
722
723 #ifndef BOOST_DOXYGEN_INVOKED
724
725 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
726     typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
727     static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
728     typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
729 #else
730     typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type;
731     static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized;
732     typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg;
733 #endif
734
735
736     struct implementation_defined
737     {
738         typedef allocator_arg allocator;
739         typedef std::size_t size_type;
740     };
741 #endif
742
743 public:
744     typedef T value_type;
745     typedef typename implementation_defined::allocator allocator;
746     typedef typename implementation_defined::size_type size_type;
747
748     /** Constructs a spsc_queue
749      *
750      *  \pre spsc_queue must be configured to be sized at compile-time
751      */
752     // @{
753     spsc_queue(void)
754     {
755         BOOST_ASSERT(!runtime_sized);
756     }
757
758     template <typename U>
759     explicit spsc_queue(typename detail::allocator_rebind_helper<allocator, U>::type const &)
760     {
761         // just for API compatibility: we don't actually need an allocator
762         BOOST_STATIC_ASSERT(!runtime_sized);
763     }
764
765     explicit spsc_queue(allocator const &)
766     {
767         // just for API compatibility: we don't actually need an allocator
768         BOOST_ASSERT(!runtime_sized);
769     }
770     // @}
771
772
773     /** Constructs a spsc_queue for element_count elements
774      *
775      *  \pre spsc_queue must be configured to be sized at run-time
776      */
777     // @{
778     explicit spsc_queue(size_type element_count):
779         base_type(element_count)
780     {
781         BOOST_ASSERT(runtime_sized);
782     }
783
784     template <typename U>
785     spsc_queue(size_type element_count, typename detail::allocator_rebind_helper<allocator, U>::type const & alloc):
786         base_type(alloc, element_count)
787     {
788         BOOST_STATIC_ASSERT(runtime_sized);
789     }
790
791     spsc_queue(size_type element_count, allocator_arg const & alloc):
792         base_type(alloc, element_count)
793     {
794         BOOST_ASSERT(runtime_sized);
795     }
796     // @}
797
798     /** Pushes object t to the ringbuffer.
799      *
800      * \pre only one thread is allowed to push data to the spsc_queue
801      * \post object will be pushed to the spsc_queue, unless it is full.
802      * \return true, if the push operation is successful.
803      *
804      * \note Thread-safe and wait-free
805      * */
806     bool push(T const & t)
807     {
808         return base_type::push(t);
809     }
810
811     /** Pops one object from ringbuffer.
812      *
813      * \pre only one thread is allowed to pop data to the spsc_queue
814      * \post if ringbuffer is not empty, object will be discarded.
815      * \return true, if the pop operation is successful, false if ringbuffer was empty.
816      *
817      * \note Thread-safe and wait-free
818      */
819     bool pop ()
820     {
821         detail::consume_noop consume_functor;
822         return consume_one( consume_functor );
823     }
824
825     /** Pops one object from ringbuffer.
826      *
827      * \pre only one thread is allowed to pop data to the spsc_queue
828      * \post if ringbuffer is not empty, object will be copied to ret.
829      * \return true, if the pop operation is successful, false if ringbuffer was empty.
830      *
831      * \note Thread-safe and wait-free
832      */
833     template <typename U>
834     typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
835     pop (U & ret)
836     {
837         detail::consume_via_copy<U> consume_functor(ret);
838         return consume_one( consume_functor );
839     }
840
841     /** Pushes as many objects from the array t as there is space.
842      *
843      * \pre only one thread is allowed to push data to the spsc_queue
844      * \return number of pushed items
845      *
846      * \note Thread-safe and wait-free
847      */
848     size_type push(T const * t, size_type size)
849     {
850         return base_type::push(t, size);
851     }
852
853     /** Pushes as many objects from the array t as there is space available.
854      *
855      * \pre only one thread is allowed to push data to the spsc_queue
856      * \return number of pushed items
857      *
858      * \note Thread-safe and wait-free
859      */
860     template <size_type size>
861     size_type push(T const (&t)[size])
862     {
863         return push(t, size);
864     }
865
866     /** Pushes as many objects from the range [begin, end) as there is space .
867      *
868      * \pre only one thread is allowed to push data to the spsc_queue
869      * \return iterator to the first element, which has not been pushed
870      *
871      * \note Thread-safe and wait-free
872      */
873     template <typename ConstIterator>
874     ConstIterator push(ConstIterator begin, ConstIterator end)
875     {
876         return base_type::push(begin, end);
877     }
878
879     /** Pops a maximum of size objects from ringbuffer.
880      *
881      * \pre only one thread is allowed to pop data to the spsc_queue
882      * \return number of popped items
883      *
884      * \note Thread-safe and wait-free
885      * */
886     size_type pop(T * ret, size_type size)
887     {
888         return base_type::pop(ret, size);
889     }
890
891     /** Pops a maximum of size objects from spsc_queue.
892      *
893      * \pre only one thread is allowed to pop data to the spsc_queue
894      * \return number of popped items
895      *
896      * \note Thread-safe and wait-free
897      * */
898     template <size_type size>
899     size_type pop(T (&ret)[size])
900     {
901         return pop(ret, size);
902     }
903
904     /** Pops objects to the output iterator it
905      *
906      * \pre only one thread is allowed to pop data to the spsc_queue
907      * \return number of popped items
908      *
909      * \note Thread-safe and wait-free
910      * */
911     template <typename OutputIterator>
912     typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
913     pop(OutputIterator it)
914     {
915         return base_type::pop_to_output_iterator(it);
916     }
917
918     /** consumes one element via a functor
919      *
920      *  pops one element from the queue and applies the functor on this object
921      *
922      * \returns true, if one element was consumed
923      *
924      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
925      * */
926     template <typename Functor>
927     bool consume_one(Functor & f)
928     {
929         return base_type::consume_one(f);
930     }
931
932     /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
933     template <typename Functor>
934     bool consume_one(Functor const & f)
935     {
936         return base_type::consume_one(f);
937     }
938
939     /** consumes all elements via a functor
940      *
941      * sequentially pops all elements from the queue and applies the functor on each object
942      *
943      * \returns number of elements that are consumed
944      *
945      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
946      * */
947     template <typename Functor>
948     size_type consume_all(Functor & f)
949     {
950         return base_type::consume_all(f);
951     }
952
953     /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
954     template <typename Functor>
955     size_type consume_all(Functor const & f)
956     {
957         return base_type::consume_all(f);
958     }
959
960     /** get number of elements that are available for read
961      *
962      * \return number of available elements that can be popped from the spsc_queue
963      *
964      * \note Thread-safe and wait-free, should only be called from the consumer thread
965      * */
966     size_type read_available() const
967     {
968         return base_type::read_available(base_type::max_number_of_elements());
969     }
970
971     /** get write space to write elements
972      *
973      * \return number of elements that can be pushed to the spsc_queue
974      *
975      * \note Thread-safe and wait-free, should only be called from the producer thread
976      * */
977     size_type write_available() const
978     {
979         return base_type::write_available(base_type::max_number_of_elements());
980     }
981
982     /** get reference to element in the front of the queue
983      *
984      * Availability of front element can be checked using read_available().
985      *
986      * \pre only a consuming thread is allowed to check front element
987      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
988      * \return reference to the first element in the queue
989      *
990      * \note Thread-safe and wait-free
991      */
992     const T& front() const
993     {
994         BOOST_ASSERT(read_available() > 0);
995         return base_type::front();
996     }
997
998     /// \copydoc boost::lockfree::spsc_queue::front() const
999     T& front()
1000     {
1001         BOOST_ASSERT(read_available() > 0);
1002         return base_type::front();
1003     }
1004
1005     /** reset the ringbuffer
1006      *
1007      * \note Not thread-safe
1008      * */
1009     void reset(void)
1010     {
1011         if ( !boost::has_trivial_destructor<T>::value ) {
1012             // make sure to call all destructors!
1013
1014             T dummy_element;
1015             while (pop(dummy_element))
1016             {}
1017         } else {
1018             base_type::write_index_.store(0, memory_order_relaxed);
1019             base_type::read_index_.store(0, memory_order_release);
1020         }
1021    }
1022 };
1023
1024 } /* namespace lockfree */
1025 } /* namespace boost */
1026
1027
1028 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */