Imported Upstream version 1.57.0
[platform/upstream/boost.git] / boost / lockfree / spsc_queue.hpp
1 //  lock-free single-producer/single-consumer ringbuffer
2 //  this algorithm is implemented in various projects (linux kernel)
3 //
4 //  Copyright (C) 2009-2013 Tim Blechmann
5 //
6 //  Distributed under the Boost Software License, Version 1.0. (See
7 //  accompanying file LICENSE_1_0.txt or copy at
8 //  http://www.boost.org/LICENSE_1_0.txt)
9
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12
13 #include <algorithm>
14 #include <memory>
15
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/utility.hpp>
20 #include <boost/utility/enable_if.hpp>
21
22 #include <boost/type_traits/has_trivial_destructor.hpp>
23 #include <boost/type_traits/is_convertible.hpp>
24
25 #include <boost/lockfree/detail/atomic.hpp>
26 #include <boost/lockfree/detail/branch_hints.hpp>
27 #include <boost/lockfree/detail/copy_payload.hpp>
28 #include <boost/lockfree/detail/parameter.hpp>
29 #include <boost/lockfree/detail/prefix.hpp>
30
31 #ifdef BOOST_HAS_PRAGMA_ONCE
32 #pragma once
33 #endif
34
35 namespace boost    {
36 namespace lockfree {
37 namespace detail   {
38
39 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
40                               boost::parameter::optional<tag::allocator>
41                              > ringbuffer_signature;
42
43 template <typename T>
44 class ringbuffer_base
45 {
46 #ifndef BOOST_DOXYGEN_INVOKED
47     typedef std::size_t size_t;
48     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
49     atomic<size_t> write_index_;
50     char padding1[padding_size]; /* force read_index and write_index to different cache lines */
51     atomic<size_t> read_index_;
52
53     BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
54     BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
55
56 protected:
57     ringbuffer_base(void):
58         write_index_(0), read_index_(0)
59     {}
60
61     static size_t next_index(size_t arg, size_t max_size)
62     {
63         size_t ret = arg + 1;
64         while (unlikely(ret >= max_size))
65             ret -= max_size;
66         return ret;
67     }
68
69     static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
70     {
71         if (write_index >= read_index)
72             return write_index - read_index;
73
74         const size_t ret = write_index + max_size - read_index;
75         return ret;
76     }
77
78     static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
79     {
80         size_t ret = read_index - write_index - 1;
81         if (write_index >= read_index)
82             ret += max_size;
83         return ret;
84     }
85
86     size_t read_available(size_t max_size) const
87     {
88         size_t write_index = write_index_.load(memory_order_relaxed);
89         const size_t read_index  = read_index_.load(memory_order_relaxed);
90         return read_available(write_index, read_index, max_size);
91     }
92
93     size_t write_available(size_t max_size) const
94     {
95         size_t write_index = write_index_.load(memory_order_relaxed);
96         const size_t read_index  = read_index_.load(memory_order_relaxed);
97         return write_available(write_index, read_index, max_size);
98     }
99
100     bool push(T const & t, T * buffer, size_t max_size)
101     {
102         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
103         const size_t next = next_index(write_index, max_size);
104
105         if (next == read_index_.load(memory_order_acquire))
106             return false; /* ringbuffer is full */
107
108         new (buffer + write_index) T(t); // copy-construct
109
110         write_index_.store(next, memory_order_release);
111
112         return true;
113     }
114
115     size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
116     {
117         return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
118     }
119
120     template <typename ConstIterator>
121     ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
122     {
123         // FIXME: avoid std::distance
124
125         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
126         const size_t read_index  = read_index_.load(memory_order_acquire);
127         const size_t avail = write_available(write_index, read_index, max_size);
128
129         if (avail == 0)
130             return begin;
131
132         size_t input_count = std::distance(begin, end);
133         input_count = (std::min)(input_count, avail);
134
135         size_t new_write_index = write_index + input_count;
136
137         const ConstIterator last = boost::next(begin, input_count);
138
139         if (write_index + input_count > max_size) {
140             /* copy data in two sections */
141             const size_t count0 = max_size - write_index;
142             const ConstIterator midpoint = boost::next(begin, count0);
143
144             std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
145             std::uninitialized_copy(midpoint, last, internal_buffer);
146             new_write_index -= max_size;
147         } else {
148             std::uninitialized_copy(begin, last, internal_buffer + write_index);
149
150             if (new_write_index == max_size)
151                 new_write_index = 0;
152         }
153
154         write_index_.store(new_write_index, memory_order_release);
155         return last;
156     }
157
158     template <typename Functor>
159     bool consume_one(Functor & functor, T * buffer, size_t max_size)
160     {
161         const size_t write_index = write_index_.load(memory_order_acquire);
162         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
163         if ( empty(write_index, read_index) )
164             return false;
165
166         T & object_to_consume = buffer[read_index];
167         functor( object_to_consume );
168         object_to_consume.~T();
169
170         size_t next = next_index(read_index, max_size);
171         read_index_.store(next, memory_order_release);
172         return true;
173     }
174
175     template <typename Functor>
176     bool consume_one(Functor const & functor, T * buffer, size_t max_size)
177     {
178         const size_t write_index = write_index_.load(memory_order_acquire);
179         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
180         if ( empty(write_index, read_index) )
181             return false;
182
183         T & object_to_consume = buffer[read_index];
184         functor( object_to_consume );
185         object_to_consume.~T();
186
187         size_t next = next_index(read_index, max_size);
188         read_index_.store(next, memory_order_release);
189         return true;
190     }
191
192     template <typename Functor>
193     size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
194     {
195         const size_t write_index = write_index_.load(memory_order_acquire);
196         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
197
198         const size_t avail = read_available(write_index, read_index, max_size);
199
200         if (avail == 0)
201             return 0;
202
203         const size_t output_count = avail;
204
205         size_t new_read_index = read_index + output_count;
206
207         if (read_index + output_count > max_size) {
208             /* copy data in two sections */
209             const size_t count0 = max_size - read_index;
210             const size_t count1 = output_count - count0;
211
212             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
213             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
214
215             new_read_index -= max_size;
216         } else {
217             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
218
219             if (new_read_index == max_size)
220                 new_read_index = 0;
221         }
222
223         read_index_.store(new_read_index, memory_order_release);
224         return output_count;
225     }
226
227     template <typename Functor>
228     size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
229     {
230         const size_t write_index = write_index_.load(memory_order_acquire);
231         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
232
233         const size_t avail = read_available(write_index, read_index, max_size);
234
235         if (avail == 0)
236             return 0;
237
238         const size_t output_count = avail;
239
240         size_t new_read_index = read_index + output_count;
241
242         if (read_index + output_count > max_size) {
243             /* copy data in two sections */
244             const size_t count0 = max_size - read_index;
245             const size_t count1 = output_count - count0;
246
247             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
248             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
249
250             new_read_index -= max_size;
251         } else {
252             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
253
254             if (new_read_index == max_size)
255                 new_read_index = 0;
256         }
257
258         read_index_.store(new_read_index, memory_order_release);
259         return output_count;
260     }
261
262     size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
263     {
264         const size_t write_index = write_index_.load(memory_order_acquire);
265         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
266
267         const size_t avail = read_available(write_index, read_index, max_size);
268
269         if (avail == 0)
270             return 0;
271
272         output_count = (std::min)(output_count, avail);
273
274         size_t new_read_index = read_index + output_count;
275
276         if (read_index + output_count > max_size) {
277             /* copy data in two sections */
278             const size_t count0 = max_size - read_index;
279             const size_t count1 = output_count - count0;
280
281             copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
282             copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
283
284             new_read_index -= max_size;
285         } else {
286             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
287             if (new_read_index == max_size)
288                 new_read_index = 0;
289         }
290
291         read_index_.store(new_read_index, memory_order_release);
292         return output_count;
293     }
294
295     template <typename OutputIterator>
296     size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
297     {
298         const size_t write_index = write_index_.load(memory_order_acquire);
299         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
300
301         const size_t avail = read_available(write_index, read_index, max_size);
302         if (avail == 0)
303             return 0;
304
305         size_t new_read_index = read_index + avail;
306
307         if (read_index + avail > max_size) {
308             /* copy data in two sections */
309             const size_t count0 = max_size - read_index;
310             const size_t count1 = avail - count0;
311
312             it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
313             copy_and_delete(internal_buffer, internal_buffer + count1, it);
314
315             new_read_index -= max_size;
316         } else {
317             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
318             if (new_read_index == max_size)
319                 new_read_index = 0;
320         }
321
322         read_index_.store(new_read_index, memory_order_release);
323         return avail;
324     }
325
326     const T& front(const T * internal_buffer) const
327     {
328         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
329         return *(internal_buffer + read_index);
330     }
331
332     T& front(T * internal_buffer)
333     {
334         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
335         return *(internal_buffer + read_index);
336     }
337 #endif
338
339
340 public:
341     /** reset the ringbuffer
342      *
343      * \note Not thread-safe
344      * */
345     void reset(void)
346     {
347         if ( !boost::has_trivial_destructor<T>::value ) {
348             // make sure to call all destructors!
349
350             T dummy_element;
351             while (pop(dummy_element))
352             {}
353         } else {
354             write_index_.store(0, memory_order_relaxed);
355             read_index_.store(0, memory_order_release);
356         }
357     }
358
359     /** Check if the ringbuffer is empty
360      *
361      * \return true, if the ringbuffer is empty, false otherwise
362      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
363      * */
364     bool empty(void)
365     {
366         return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
367     }
368
369     /**
370      * \return true, if implementation is lock-free.
371      *
372      * */
373     bool is_lock_free(void) const
374     {
375         return write_index_.is_lock_free() && read_index_.is_lock_free();
376     }
377
378 private:
379     bool empty(size_t write_index, size_t read_index)
380     {
381         return write_index == read_index;
382     }
383
384     template< class OutputIterator >
385     OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
386     {
387         if (boost::has_trivial_destructor<T>::value) {
388             return std::copy(first, last, out); // will use memcpy if possible
389         } else {
390             for (; first != last; ++first, ++out) {
391                 *out = *first;
392                 first->~T();
393             }
394             return out;
395         }
396     }
397
398     template< class Functor >
399     void run_functor_and_delete( T * first, T * last, Functor & functor )
400     {
401         for (; first != last; ++first) {
402             functor(*first);
403             first->~T();
404         }
405     }
406
407     template< class Functor >
408     void run_functor_and_delete( T * first, T * last, Functor const & functor )
409     {
410         for (; first != last; ++first) {
411             functor(*first);
412             first->~T();
413         }
414     }
415 };
416
417 template <typename T, std::size_t MaxSize>
418 class compile_time_sized_ringbuffer:
419     public ringbuffer_base<T>
420 {
421     typedef std::size_t size_type;
422     static const std::size_t max_size = MaxSize + 1;
423
424     typedef typename boost::aligned_storage<max_size * sizeof(T),
425                                             boost::alignment_of<T>::value
426                                            >::type storage_type;
427
428     storage_type storage_;
429
430     T * data()
431     {
432         return static_cast<T*>(storage_.address());
433     }
434
435     const T * data() const
436     {
437         return static_cast<const T*>(storage_.address());
438     }
439
440 protected:
441     size_type max_number_of_elements() const
442     {
443         return max_size;
444     }
445
446 public:
447     bool push(T const & t)
448     {
449         return ringbuffer_base<T>::push(t, data(), max_size);
450     }
451
452     template <typename Functor>
453     bool consume_one(Functor & f)
454     {
455         return ringbuffer_base<T>::consume_one(f, data(), max_size);
456     }
457
458     template <typename Functor>
459     bool consume_one(Functor const & f)
460     {
461         return ringbuffer_base<T>::consume_one(f, data(), max_size);
462     }
463
464     template <typename Functor>
465     bool consume_all(Functor & f)
466     {
467         return ringbuffer_base<T>::consume_all(f, data(), max_size);
468     }
469
470     template <typename Functor>
471     bool consume_all(Functor const & f)
472     {
473         return ringbuffer_base<T>::consume_all(f, data(), max_size);
474     }
475
476     size_type push(T const * t, size_type size)
477     {
478         return ringbuffer_base<T>::push(t, size, data(), max_size);
479     }
480
481     template <size_type size>
482     size_type push(T const (&t)[size])
483     {
484         return push(t, size);
485     }
486
487     template <typename ConstIterator>
488     ConstIterator push(ConstIterator begin, ConstIterator end)
489     {
490         return ringbuffer_base<T>::push(begin, end, data(), max_size);
491     }
492
493     size_type pop(T * ret, size_type size)
494     {
495         return ringbuffer_base<T>::pop(ret, size, data(), max_size);
496     }
497
498     template <typename OutputIterator>
499     size_type pop_to_output_iterator(OutputIterator it)
500     {
501         return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
502     }
503
504     const T& front(void) const
505     {
506         return ringbuffer_base<T>::front(data());
507     }
508
509     T& front(void)
510     {
511         return ringbuffer_base<T>::front(data());
512     }
513 };
514
515 template <typename T, typename Alloc>
516 class runtime_sized_ringbuffer:
517     public ringbuffer_base<T>,
518     private Alloc
519 {
520     typedef std::size_t size_type;
521     size_type max_elements_;
522     typedef typename Alloc::pointer pointer;
523     pointer array_;
524
525 protected:
526     size_type max_number_of_elements() const
527     {
528         return max_elements_;
529     }
530
531 public:
532     explicit runtime_sized_ringbuffer(size_type max_elements):
533         max_elements_(max_elements + 1)
534     {
535         array_ = Alloc::allocate(max_elements_);
536     }
537
538     template <typename U>
539     runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
540         Alloc(alloc), max_elements_(max_elements + 1)
541     {
542         array_ = Alloc::allocate(max_elements_);
543     }
544
545     runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
546         Alloc(alloc), max_elements_(max_elements + 1)
547     {
548         array_ = Alloc::allocate(max_elements_);
549     }
550
551     ~runtime_sized_ringbuffer(void)
552     {
553         // destroy all remaining items
554         T out;
555         while (pop(&out, 1)) {}
556
557         Alloc::deallocate(array_, max_elements_);
558     }
559
560     bool push(T const & t)
561     {
562         return ringbuffer_base<T>::push(t, &*array_, max_elements_);
563     }
564
565     template <typename Functor>
566     bool consume_one(Functor & f)
567     {
568         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
569     }
570
571     template <typename Functor>
572     bool consume_one(Functor const & f)
573     {
574         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
575     }
576
577     template <typename Functor>
578     size_type consume_all(Functor & f)
579     {
580         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
581     }
582
583     template <typename Functor>
584     size_type consume_all(Functor const & f)
585     {
586         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
587     }
588
589     size_type push(T const * t, size_type size)
590     {
591         return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
592     }
593
594     template <size_type size>
595     size_type push(T const (&t)[size])
596     {
597         return push(t, size);
598     }
599
600     template <typename ConstIterator>
601     ConstIterator push(ConstIterator begin, ConstIterator end)
602     {
603         return ringbuffer_base<T>::push(begin, end, array_, max_elements_);
604     }
605
606     size_type pop(T * ret, size_type size)
607     {
608         return ringbuffer_base<T>::pop(ret, size, array_, max_elements_);
609     }
610
611     template <typename OutputIterator>
612     size_type pop_to_output_iterator(OutputIterator it)
613     {
614         return ringbuffer_base<T>::pop_to_output_iterator(it, array_, max_elements_);
615     }
616
617     const T& front(void) const
618     {
619         return ringbuffer_base<T>::front(array_);
620     }
621
622     T& front(void)
623     {
624         return ringbuffer_base<T>::front(array_);
625     }
626 };
627
628 template <typename T, typename A0, typename A1>
629 struct make_ringbuffer
630 {
631     typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
632
633     typedef extract_capacity<bound_args> extract_capacity_t;
634
635     static const bool runtime_sized = !extract_capacity_t::has_capacity;
636     static const size_t capacity    =  extract_capacity_t::capacity;
637
638     typedef extract_allocator<bound_args, T> extract_allocator_t;
639     typedef typename extract_allocator_t::type allocator;
640
641     // allocator argument is only sane, for run-time sized ringbuffers
642     BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
643                                   mpl::bool_<!extract_allocator_t::has_allocator>,
644                                   mpl::true_
645                                  >::type::value));
646
647     typedef typename mpl::if_c<runtime_sized,
648                                runtime_sized_ringbuffer<T, allocator>,
649                                compile_time_sized_ringbuffer<T, capacity>
650                               >::type ringbuffer_type;
651 };
652
653
654 } /* namespace detail */
655
656
657 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
658  *
659  *  \b Policies:
660  *  - \c boost::lockfree::capacity<>, optional <br>
661  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
662  *
663  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
664  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
665  *    to be sized at run-time
666  *
667  *  \b Requirements:
668  *  - T must have a default constructor
669  *  - T must be copyable
670  * */
671 #ifndef BOOST_DOXYGEN_INVOKED
672 template <typename T,
673           class A0 = boost::parameter::void_,
674           class A1 = boost::parameter::void_>
675 #else
676 template <typename T, ...Options>
677 #endif
678 class spsc_queue:
679     public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
680 {
681 private:
682
683 #ifndef BOOST_DOXYGEN_INVOKED
684     typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
685     static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
686     typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
687
688     struct implementation_defined
689     {
690         typedef allocator_arg allocator;
691         typedef std::size_t size_type;
692     };
693 #endif
694
695 public:
696     typedef T value_type;
697     typedef typename implementation_defined::allocator allocator;
698     typedef typename implementation_defined::size_type size_type;
699
700     /** Constructs a spsc_queue
701      *
702      *  \pre spsc_queue must be configured to be sized at compile-time
703      */
704     // @{
705     spsc_queue(void)
706     {
707         BOOST_ASSERT(!runtime_sized);
708     }
709
710     template <typename U>
711     explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc)
712     {
713         // just for API compatibility: we don't actually need an allocator
714         BOOST_STATIC_ASSERT(!runtime_sized);
715     }
716
717     explicit spsc_queue(allocator const & alloc)
718     {
719         // just for API compatibility: we don't actually need an allocator
720         BOOST_ASSERT(!runtime_sized);
721     }
722     // @}
723
724
725     /** Constructs a spsc_queue for element_count elements
726      *
727      *  \pre spsc_queue must be configured to be sized at run-time
728      */
729     // @{
730     explicit spsc_queue(size_type element_count):
731         base_type(element_count)
732     {
733         BOOST_ASSERT(runtime_sized);
734     }
735
736     template <typename U>
737     spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
738         base_type(alloc, element_count)
739     {
740         BOOST_STATIC_ASSERT(runtime_sized);
741     }
742
743     spsc_queue(size_type element_count, allocator_arg const & alloc):
744         base_type(alloc, element_count)
745     {
746         BOOST_ASSERT(runtime_sized);
747     }
748     // @}
749
750     /** Pushes object t to the ringbuffer.
751      *
752      * \pre only one thread is allowed to push data to the spsc_queue
753      * \post object will be pushed to the spsc_queue, unless it is full.
754      * \return true, if the push operation is successful.
755      *
756      * \note Thread-safe and wait-free
757      * */
758     bool push(T const & t)
759     {
760         return base_type::push(t);
761     }
762
763     /** Pops one object from ringbuffer.
764      *
765      * \pre only one thread is allowed to pop data to the spsc_queue
766      * \post if ringbuffer is not empty, object will be discarded.
767      * \return true, if the pop operation is successful, false if ringbuffer was empty.
768      *
769      * \note Thread-safe and wait-free
770      */
771     bool pop ()
772     {
773         detail::consume_noop consume_functor;
774         return consume_one( consume_functor );
775     }
776
777     /** Pops one object from ringbuffer.
778      *
779      * \pre only one thread is allowed to pop data to the spsc_queue
780      * \post if ringbuffer is not empty, object will be copied to ret.
781      * \return true, if the pop operation is successful, false if ringbuffer was empty.
782      *
783      * \note Thread-safe and wait-free
784      */
785     template <typename U>
786     typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
787     pop (U & ret)
788     {
789         detail::consume_via_copy<U> consume_functor(ret);
790         return consume_one( consume_functor );
791     }
792
793     /** Pushes as many objects from the array t as there is space.
794      *
795      * \pre only one thread is allowed to push data to the spsc_queue
796      * \return number of pushed items
797      *
798      * \note Thread-safe and wait-free
799      */
800     size_type push(T const * t, size_type size)
801     {
802         return base_type::push(t, size);
803     }
804
805     /** Pushes as many objects from the array t as there is space available.
806      *
807      * \pre only one thread is allowed to push data to the spsc_queue
808      * \return number of pushed items
809      *
810      * \note Thread-safe and wait-free
811      */
812     template <size_type size>
813     size_type push(T const (&t)[size])
814     {
815         return push(t, size);
816     }
817
818     /** Pushes as many objects from the range [begin, end) as there is space .
819      *
820      * \pre only one thread is allowed to push data to the spsc_queue
821      * \return iterator to the first element, which has not been pushed
822      *
823      * \note Thread-safe and wait-free
824      */
825     template <typename ConstIterator>
826     ConstIterator push(ConstIterator begin, ConstIterator end)
827     {
828         return base_type::push(begin, end);
829     }
830
831     /** Pops a maximum of size objects from ringbuffer.
832      *
833      * \pre only one thread is allowed to pop data to the spsc_queue
834      * \return number of popped items
835      *
836      * \note Thread-safe and wait-free
837      * */
838     size_type pop(T * ret, size_type size)
839     {
840         return base_type::pop(ret, size);
841     }
842
843     /** Pops a maximum of size objects from spsc_queue.
844      *
845      * \pre only one thread is allowed to pop data to the spsc_queue
846      * \return number of popped items
847      *
848      * \note Thread-safe and wait-free
849      * */
850     template <size_type size>
851     size_type pop(T (&ret)[size])
852     {
853         return pop(ret, size);
854     }
855
856     /** Pops objects to the output iterator it
857      *
858      * \pre only one thread is allowed to pop data to the spsc_queue
859      * \return number of popped items
860      *
861      * \note Thread-safe and wait-free
862      * */
863     template <typename OutputIterator>
864     typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
865     pop(OutputIterator it)
866     {
867         return base_type::pop_to_output_iterator(it);
868     }
869
870     /** consumes one element via a functor
871      *
872      *  pops one element from the queue and applies the functor on this object
873      *
874      * \returns true, if one element was consumed
875      *
876      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
877      * */
878     template <typename Functor>
879     bool consume_one(Functor & f)
880     {
881         return base_type::consume_one(f);
882     }
883
884     /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
885     template <typename Functor>
886     bool consume_one(Functor const & f)
887     {
888         return base_type::consume_one(f);
889     }
890
891     /** consumes all elements via a functor
892      *
893      * sequentially pops all elements from the queue and applies the functor on each object
894      *
895      * \returns number of elements that are consumed
896      *
897      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
898      * */
899     template <typename Functor>
900     size_type consume_all(Functor & f)
901     {
902         return base_type::consume_all(f);
903     }
904
905     /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
906     template <typename Functor>
907     size_type consume_all(Functor const & f)
908     {
909         return base_type::consume_all(f);
910     }
911
912     /** get number of elements that are available for read
913      *
914      * \return number of available elements that can be popped from the spsc_queue
915      *
916      * \note Thread-safe and wait-free, should only be called from the producer thread
917      * */
918     size_type read_available() const
919     {
920         return base_type::read_available(base_type::max_number_of_elements());
921     }
922
923     /** get write space to write elements
924      *
925      * \return number of elements that can be pushed to the spsc_queue
926      *
927      * \note Thread-safe and wait-free, should only be called from the consumer thread
928      * */
929     size_type write_available() const
930     {
931         return base_type::write_available(base_type::max_number_of_elements());
932     }
933
934     /** get reference to element in the front of the queue
935      *
936      * Availability of front element can be checked using read_available().
937      *
938      * \pre only one thread is allowed to check front element
939      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
940      * \return reference to the first element in the queue
941      *
942      * \note Thread-safe and wait-free
943      */
944     const T& front() const
945     {
946         BOOST_ASSERT(read_available() > 0);
947         return base_type::front();
948     }
949
950     /// \copydoc boost::lockfree::spsc_queue::front() const
951     T& front()
952     {
953         BOOST_ASSERT(read_available() > 0);
954         return base_type::front();
955     }
956 };
957
958 } /* namespace lockfree */
959 } /* namespace boost */
960
961
962 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */