Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / include / tbb / concurrent_queue.h
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19
20 #define __TBB_concurrent_queue_H_include_area
21 #include "internal/_warning_suppress_enable_notice.h"
22
23 #include "internal/_concurrent_queue_impl.h"
24 #include "internal/_allocator_traits.h"
25
26 namespace tbb {
27
28 namespace strict_ppl {
29
30 //! A high-performance thread-safe non-blocking concurrent queue.
31 /** Multiple threads may each push and pop concurrently.
32     Assignment construction is not allowed.
33     @ingroup containers */
34 template<typename T, typename A = cache_aligned_allocator<T> >
35 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
36     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
37
38     //! Allocator type
39     typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
40     page_allocator_type my_allocator;
41
42     //! Allocates a block of size n (bytes)
43     virtual void *allocate_block( size_t n ) __TBB_override {
44         void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
45         if( !b )
46             internal::throw_exception(internal::eid_bad_alloc);
47         return b;
48     }
49
50     //! Deallocates block created by allocate_block.
51     virtual void deallocate_block( void *b, size_t n ) __TBB_override {
52         my_allocator.deallocate( reinterpret_cast<char*>(b), n );
53     }
54
55     static void copy_construct_item(T* location, const void* src){
56         new (location) T(*static_cast<const T*>(src));
57     }
58
59 #if __TBB_CPP11_RVALUE_REF_PRESENT
60     static void move_construct_item(T* location, const void* src) {
61         new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
62     }
63 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
64 public:
65     //! Element type in the queue.
66     typedef T value_type;
67
68     //! Reference type
69     typedef T& reference;
70
71     //! Const reference type
72     typedef const T& const_reference;
73
74     //! Integral type for representing size of the queue.
75     typedef size_t size_type;
76
77     //! Difference type for iterator
78     typedef ptrdiff_t difference_type;
79
80     //! Allocator type
81     typedef A allocator_type;
82
83     //! Construct empty queue
84     explicit concurrent_queue(const allocator_type& a = allocator_type()) :
85         my_allocator( a )
86     {
87     }
88
89     //! [begin,end) constructor
90     template<typename InputIterator>
91     concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
92         my_allocator( a )
93     {
94         for( ; begin != end; ++begin )
95             this->push(*begin);
96     }
97
98     //! Copy constructor
99     concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
100         internal::concurrent_queue_base_v3<T>(), my_allocator( a )
101     {
102         this->assign( src, copy_construct_item );
103     }
104
105 #if __TBB_CPP11_RVALUE_REF_PRESENT
106     //! Move constructors
107     concurrent_queue( concurrent_queue&& src ) :
108         internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
109     {
110         this->internal_swap( src );
111     }
112
113     concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
114         internal::concurrent_queue_base_v3<T>(), my_allocator( a )
115     {
116         // checking that memory allocated by one instance of allocator can be deallocated
117         // with another
118         if( my_allocator == src.my_allocator) {
119             this->internal_swap( src );
120         } else {
121             // allocators are different => performing per-element move
122             this->assign( src, move_construct_item );
123             src.clear();
124         }
125     }
126 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
127
128     //! Destroy queue
129     ~concurrent_queue();
130
131     //! Enqueue an item at tail of queue.
132     void push( const T& source ) {
133         this->internal_push( &source, copy_construct_item );
134     }
135
136 #if __TBB_CPP11_RVALUE_REF_PRESENT
137     void push( T&& source ) {
138         this->internal_push( &source, move_construct_item );
139     }
140
141 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
142     template<typename... Arguments>
143     void emplace( Arguments&&... args ) {
144         push( T(std::forward<Arguments>( args )...) );
145     }
146 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
147 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
148
149     //! Attempt to dequeue an item from head of queue.
150     /** Does not wait for item to become available.
151         Returns true if successful; false otherwise. */
152     bool try_pop( T& result ) {
153         return this->internal_try_pop( &result );
154     }
155
156     //! Return the number of items in the queue; thread unsafe
157     size_type unsafe_size() const {return this->internal_size();}
158
159     //! Equivalent to size()==0.
160     bool empty() const {return this->internal_empty();}
161
162     //! Clear the queue. not thread-safe.
163     void clear() ;
164
165     //! Return allocator object
166     allocator_type get_allocator() const { return this->my_allocator; }
167
168     typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
169     typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
170
171     //------------------------------------------------------------------------
172     // The iterators are intended only for debugging.  They are slow and not thread safe.
173     //------------------------------------------------------------------------
174     iterator unsafe_begin() {return iterator(*this);}
175     iterator unsafe_end() {return iterator();}
176     const_iterator unsafe_begin() const {return const_iterator(*this);}
177     const_iterator unsafe_end() const {return const_iterator();}
178 } ;
179
180 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
181 // Deduction guide for the constructor from two iterators
182 template<typename InputIterator,
183          typename T = typename std::iterator_traits<InputIterator>::value_type,
184          typename A = cache_aligned_allocator<T>
185 > concurrent_queue(InputIterator, InputIterator, const A& = A())
186 -> concurrent_queue<T, A>;
187 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
188
189 template<typename T, class A>
190 concurrent_queue<T,A>::~concurrent_queue() {
191     clear();
192     this->internal_finish_clear();
193 }
194
195 template<typename T, class A>
196 void concurrent_queue<T,A>::clear() {
197     T value;
198     while( !empty() ) try_pop(value);
199 }
200
201 } // namespace strict_ppl
202
203 //! A high-performance thread-safe blocking concurrent bounded queue.
204 /** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics.
205     Note that method names agree with the PPL-style concurrent queue.
206     Multiple threads may each push and pop concurrently.
207     Assignment construction is not allowed.
208     @ingroup containers */
209 template<typename T, class A = cache_aligned_allocator<T> >
210 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
211     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
212     typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
213
214     //! Allocator type
215     page_allocator_type my_allocator;
216
217     typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
218     typedef typename concurrent_queue_base_v3::copy_specifics copy_specifics;
219
220     //! Class used to ensure exception-safety of method "pop"
221     class destroyer: internal::no_copy {
222         T& my_value;
223     public:
224         destroyer( T& value ) : my_value(value) {}
225         ~destroyer() {my_value.~T();}
226     };
227
228     T& get_ref( page& p, size_t index ) {
229         __TBB_ASSERT( index<items_per_page, NULL );
230         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
231     }
232
233     virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
234         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
235     }
236
237 #if __TBB_CPP11_RVALUE_REF_PRESENT
238     virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
239         new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
240     }
241 #else
242     virtual void move_item( page&, size_t, const void* ) __TBB_override {
243         __TBB_ASSERT( false, "Unreachable code" );
244     }
245 #endif
246
247     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
248         new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
249     }
250
251 #if __TBB_CPP11_RVALUE_REF_PRESENT
252     virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
253         new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
254     }
255 #else
256     virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
257         __TBB_ASSERT( false, "Unreachable code" );
258     }
259 #endif
260
261     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
262         T& from = get_ref(src,index);
263         destroyer d(from);
264         *static_cast<T*>(dst) = tbb::internal::move( from );
265     }
266
267     virtual page *allocate_page() __TBB_override {
268         size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
269         page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
270         if( !p )
271             internal::throw_exception(internal::eid_bad_alloc);
272         return p;
273     }
274
275     virtual void deallocate_page( page *p ) __TBB_override {
276         size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
277         my_allocator.deallocate( reinterpret_cast<char*>(p), n );
278     }
279
280 public:
281     //! Element type in the queue.
282     typedef T value_type;
283
284     //! Allocator type
285     typedef A allocator_type;
286
287     //! Reference type
288     typedef T& reference;
289
290     //! Const reference type
291     typedef const T& const_reference;
292
293     //! Integral type for representing size of the queue.
294     /** Note that the size_type is a signed integral type.
295         This is because the size can be negative if there are pending pops without corresponding pushes. */
296     typedef std::ptrdiff_t size_type;
297
298     //! Difference type for iterator
299     typedef std::ptrdiff_t difference_type;
300
301     //! Construct empty queue
302     explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
303         concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
304     {
305     }
306
307     //! Copy constructor
308     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
309         : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
310     {
311         assign( src );
312     }
313
314 #if __TBB_CPP11_RVALUE_REF_PRESENT
315     //! Move constructors
316     concurrent_bounded_queue( concurrent_bounded_queue&& src )
317         : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
318     {
319         internal_swap( src );
320     }
321
322     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
323         : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
324     {
325         // checking that memory allocated by one instance of allocator can be deallocated
326         // with another
327         if( my_allocator == src.my_allocator) {
328             this->internal_swap( src );
329         } else {
330             // allocators are different => performing per-element move
331             this->move_content( src );
332             src.clear();
333         }
334     }
335 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
336
337     //! [begin,end) constructor
338     template<typename InputIterator>
339     concurrent_bounded_queue( InputIterator begin, InputIterator end,
340                               const allocator_type& a = allocator_type())
341         : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
342     {
343         for( ; begin != end; ++begin )
344             internal_push_if_not_full(&*begin);
345     }
346
347     //! Destroy queue
348     ~concurrent_bounded_queue();
349
350     //! Enqueue an item at tail of queue.
351     void push( const T& source ) {
352         internal_push( &source );
353     }
354
355 #if __TBB_CPP11_RVALUE_REF_PRESENT
356     //! Move an item at tail of queue.
357     void push( T&& source ) {
358         internal_push_move( &source );
359     }
360
361 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
362     template<typename... Arguments>
363     void emplace( Arguments&&... args ) {
364         push( T(std::forward<Arguments>( args )...) );
365     }
366 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
367 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
368
369     //! Dequeue item from head of queue.
370     /** Block until an item becomes available, and then dequeue it. */
371     void pop( T& destination ) {
372         internal_pop( &destination );
373     }
374
375 #if TBB_USE_EXCEPTIONS
376     //! Abort all pending queue operations
377     void abort() {
378         internal_abort();
379     }
380 #endif
381
382     //! Enqueue an item at tail of queue if queue is not already full.
383     /** Does not wait for queue to become not full.
384         Returns true if item is pushed; false if queue was already full. */
385     bool try_push( const T& source ) {
386         return internal_push_if_not_full( &source );
387     }
388
389 #if __TBB_CPP11_RVALUE_REF_PRESENT
390     //! Move an item at tail of queue if queue is not already full.
391     /** Does not wait for queue to become not full.
392         Returns true if item is pushed; false if queue was already full. */
393     bool try_push( T&& source ) {
394         return internal_push_move_if_not_full( &source );
395     }
396 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
397     template<typename... Arguments>
398     bool try_emplace( Arguments&&... args ) {
399         return try_push( T(std::forward<Arguments>( args )...) );
400     }
401 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
402 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
403
404     //! Attempt to dequeue an item from head of queue.
405     /** Does not wait for item to become available.
406         Returns true if successful; false otherwise. */
407     bool try_pop( T& destination ) {
408         return internal_pop_if_present( &destination );
409     }
410
411     //! Return number of pushes minus number of pops.
412     /** Note that the result can be negative if there are pops waiting for the
413         corresponding pushes.  The result can also exceed capacity() if there
414         are push operations in flight. */
415     size_type size() const {return internal_size();}
416
417     //! Equivalent to size()<=0.
418     bool empty() const {return internal_empty();}
419
420     //! Maximum number of allowed elements
421     size_type capacity() const {
422         return my_capacity;
423     }
424
425     //! Set the capacity
426     /** Setting the capacity to 0 causes subsequent try_push operations to always fail,
427         and subsequent push operations to block forever. */
428     void set_capacity( size_type new_capacity ) {
429         internal_set_capacity( new_capacity, sizeof(T) );
430     }
431
432     //! return allocator object
433     allocator_type get_allocator() const { return this->my_allocator; }
434
435     //! clear the queue. not thread-safe.
436     void clear() ;
437
438     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
439     typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
440
441     //------------------------------------------------------------------------
442     // The iterators are intended only for debugging.  They are slow and not thread safe.
443     //------------------------------------------------------------------------
444     iterator unsafe_begin() {return iterator(*this);}
445     iterator unsafe_end() {return iterator();}
446     const_iterator unsafe_begin() const {return const_iterator(*this);}
447     const_iterator unsafe_end() const {return const_iterator();}
448
449 };
450
451 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
452 // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
453 template<typename InputIterator,
454          typename T = typename std::iterator_traits<InputIterator>::value_type,
455          typename A = cache_aligned_allocator<T>
456 > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
457 -> concurrent_bounded_queue<T, A>;
458 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
459
460 template<typename T, class A>
461 concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
462     clear();
463     internal_finish_clear();
464 }
465
466 template<typename T, class A>
467 void concurrent_bounded_queue<T,A>::clear() {
468     T value;
469     while( try_pop(value) ) /*noop*/;
470 }
471
472 using strict_ppl::concurrent_queue;
473
474 } // namespace tbb
475
476 #include "internal/_warning_suppress_disable_notice.h"
477 #undef __TBB_concurrent_queue_H_include_area
478
479 #endif /* __TBB_concurrent_queue_H */